How to fix "Error opening block StreamChunkId" on external spark shuffle service

1/14/2020

I'm trying to run spark jobs from my zeppelin deployment in a kubernetes cluster. I have a spark shuffle service (daemonset - v2.2.0-k8s) running on a different namespace as well. Here are my spark configs (set on zeppelin pod)

--conf spark.kubernetes.executor.docker.image=<spark-executor> 
--conf spark.executor.cores=5
--conf spark.driver.memory=5g
--conf spark.executor.memory=5g
--conf spark.kubernetes.authenticate.driver.serviceAccountName=<svc-account> 
--conf spark.local.dir=/tmp/spark-local 
--conf spark.executor.instances=5 
--conf spark.dynamicAllocation.enabled=true 
--conf spark.shuffle.service.enabled=true 
--conf spark.kubernetes.shuffle.labels="app=spark-shuffle,spark-version=2.2.0" 
--conf spark.dynamicAllocation.maxExecutors=5   
--conf spark.dynamicAllocation.minExecutors=1 
--conf spark.kubernetes.shuffle.namespace=<namespace> 
--conf spark.kubernetes.docker.image.pullPolicy=IfNotPresent 
--conf spark.kubernetes.initcontainer.docker.image=kubespark/spark-init:v2.2.0-kubernetes-0.5.0 
--conf spark.kubernetes.resourceStagingServer.uri=<ip:port>

But I get the following logs from external spark-shuffle and spark executors spawned by zeppelin:

+ /sbin/tini -s -- /opt/spark/bin/spark-class org.apache.spark.deploy.k8s.KubernetesExternalShuffleService 1
SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in [jar:file:/opt/spark/jars/slf4j-log4j12-1.7.16.jar!/org/sl
f4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/opt/spark/jars/kubernetes-client-3.0.1.jar!/org
/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]
2020-01-14 03:37:31 INFO  ExternalShuffleService:2574 - Started daemon with proces
s name: 10@unawa2-shuffle-unawa2-spark-shuffle-d5cfg
2020-01-14 03:37:31 INFO  SignalUtils:54 - Registered signal handler for TERM
2020-01-14 03:37:31 INFO  SignalUtils:54 - Registered signal handler for HUP
2020-01-14 03:37:31 INFO  SignalUtils:54 - Registered signal handler for INT
2020-01-14 03:37:31 WARN  NativeCodeLoader:62 - Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
2020-01-14 03:37:31 INFO  SecurityManager:54 - Changing view acls to: root
2020-01-14 03:37:31 INFO  SecurityManager:54 - Changing modify acls to: root
2020-01-14 03:37:31 INFO  SecurityManager:54 - Changing view acls groups to:
2020-01-14 03:37:31 INFO  SecurityManager:54 - Changing modify acls groups to:
2020-01-14 03:37:31 INFO  SecurityManager:54 - SecurityManager: authentication disabled; ui acls disabled; users  with view permissions: Set(root); groups with view permissions: Set(); users  with modify permissions: Set(root); groups with modify permissions: Set()
2020-01-14 03:37:32 INFO  KubernetesExternalShuffleService:54 - Starting shuffle service on port 7337 (auth enabled = false)
2020-01-14 03:38:35 INFO  KubernetesShuffleBlockHandler:54 - Received registration request from app spark-application-1578973110574 (remote address /192.168.2.37:40318).
2020-01-14 03:38:36 INFO  ExternalShuffleBlockResolver:135 - Registered executor AppExecId{appId=spark-application-1578973110574, execId=5} with ExecutorShuffleInfo{localDirs=[/tmp/spark-local/blockmgr-8a26a714-3ecb-46dd-8499-ff796fa97744], subDirsPerLocalDir=64, shuffleManager=org.apache.spark.shuffle.sort.SortShuffleManager}
2020-01-14 03:39:15 ERROR TransportRequestHandler:127 - Error opening block StreamChunkId{streamId=527834012000, chunkIndex=0} for request from /192.168.3.130:50896
java.lang.RuntimeException: Failed to open file: /tmp/spark-local/blockmgr-8a26a714-3ecb-46dd-8499-ff796fa97744/0f/shuffle_1_0_0.index
        at org.apache.spark.network.shuffle.ExternalShuffleBlockResolver.getSortBasedShuffleBlockData(ExternalShuffleBlockResolver.java:249)
        at org.apache.spark.network.shuffle.ExternalShuffleBlockResolver.getBlockData(ExternalShuffleBlockResolver.java:174)
        at org.apache.spark.network.shuffle.ExternalShuffleBlockHandler$1.next(ExternalShuffleBlockHandler.java:105)
        at org.apache.spark.network.shuffle.ExternalShuffleBlockHandler$1.next(ExternalShuffleBlockHandler.java:95)
        at org.apache.spark.network.server.OneForOneStreamManager.getChunk(OneForOneStreamManager.java:89)
        at org.apache.spark.network.server.TransportRequestHandler.processFetchRequest(TransportRequestHandler.java:125)
        at org.apache.spark.network.server.TransportRequestHandler.handle(TransportRequestHandler.java:103)
        at org.apache.spark.network.server.TransportChannelHandler.channelRead(TransportChannelHandler.java:118)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:357)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:343)
        at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:336)
        at io.netty.handler.timeout.IdleStateHandler.channelRead(IdleStateHandler.java:287)
.
.
.
Caused by: java.util.concurrent.ExecutionException: java.io.FileNotFoundException: /tmp/spark-local/blockmgr-8a26a714-3ecb-46dd-8499-ff796fa97744/0f/shuffle_1_0_0.index (No such file or directory)
        at org.spark_project.guava.util.concurrent.AbstractFuture$Sync.getValue(AbstractFuture.java:306)
        at org.spark_project.guava.util.concurrent.AbstractFuture$Sync.get(AbstractFuture.java:293)
        at org.spark_project.guava.util.concurrent.AbstractFuture.get(AbstractFuture.java:116)
        at org.spark_project.guava.util.concurrent.Uninterruptibles.getUninterruptibly(Uninterruptibles.java:135)

Any idea how to fix this?

[EDIT]

I mounted the local dir /tmp/spark-local into my pods. When I ssh into each node, I confirmed that the block manager exists in one of the worker nodes (I'm guessing this is the expected behavior). The error occurs when one of the shuffle pods from another worker node tries to access the same block manager.

-- Joshua Villanueva
apache-spark
apache-zeppelin
kubernetes
scala

1 Answer

2/24/2020

The summary out of the comments thread.

In order to run Spark on Kubernetes with dynamic allocation enabled you can:

Follow the apache-spark-on-k8s guide

Important notes:

  • You should base your images on kubespark images, which are built with the forked Apache Spark 2.2.0
  • The feature is experimental and out of the support

Wait until Spark 3.0.0 is released with SPARK-24432 and SPARK-25299

-- Aliaksandr Sasnouskikh
Source: StackOverflow