Spark executors fails to run on kubernetes cluster

1/31/2021

I am trying to run a simple spark job on a kubernetes cluster. I deployed a pod that starts a pyspark shell and in that shell I am changing the spark configuration as specified below:

>>> sc.stop()
>>> sparkConf = SparkConf()
>>> sparkConf.setMaster("k8s://https://kubernetes.default.svc:443")
>>> sparkConf.setAppName("pyspark_test")
>>> sparkConf.set("spark.submit.deployMode", "client")
>>> sparkConf.set("spark.executor.instances", 2)
>>> sparkConf.set("spark.kubernetes.container.image", "us.icr.io/testspark/spark:v1")
>>> sparkConf.set("spark.kubernetes.namespace", "anonymous")
>>> sparkConf.set("spark.driver.memory", "1g")
>>> sparkConf.set("spark.executor.memory", "1g")
>>> sparkConf.set("spark.driver.host", "testspark")
>>> sparkConf.set("spark.driver.port", "37771")
>>> sparkConf.set("spark.kubernetes.driver.pod.name", "testspark")
>>> sparkConf.set("spark.driver.bindAddress", "0.0.0.0")
>>>
>>> spark = SparkSession.builder.config(conf=sparkConf).getOrCreate()
>>> sc = spark.sparkContext

This starts two new executor pods but both fails:

satyam@Satyams-MBP ~ % kubectl get pods -n anonymous
NAME                                                              READY   STATUS    RESTARTS   AGE
pysparktest-c1c8f177591feb60-exec-1                               0/2     Error     0          111m
pysparktest-c1c8f177591feb60-exec-2                               0/2     Error     0          111m
testspark                                                         2/2     Running   0          116m

I checked logs for one of the executor pod and it shows following error:

satyam@Satyams-MBP ~ % kubectl logs -n anonymous pysparktest-c1c8f177591feb60-exec-1 -c spark-kubernetes-executor 
++ id -u
+ myuid=185
++ id -g
+ mygid=0
+ set +e
++ getent passwd 185
+ uidentry=
+ set -e
+ '[' -z '' ']'
+ '[' -w /etc/passwd ']'
+ echo '185:x:185:0:anonymous uid:/opt/spark:/bin/false'
+ SPARK_CLASSPATH=':/opt/spark/jars/*'
+ env
+ grep SPARK_JAVA_OPT_
+ sed 's/[^=]*=\(.*\)/\1/g'
+ sort -t_ -k4 -n
+ readarray -t SPARK_EXECUTOR_JAVA_OPTS
+ '[' -n '' ']'
+ '[' '' == 2 ']'
+ '[' '' == 3 ']'
+ '[' -n '' ']'
+ '[' -z ']'
+ case "$1" in
+ shift 1
+ CMD=(${JAVA_HOME}/bin/java "${SPARK_EXECUTOR_JAVA_OPTS[@]}" -Xms$SPARK_EXECUTOR_MEMORY -Xmx$SPARK_EXECUTOR_MEMORY -cp "$SPARK_CLASSPATH:$SPARK_DIST_CLASSPATH" org.apache.spark.executor.CoarseGrainedExecutorBackend --driver-url $SPARK_DRIVER_URL --executor-id $SPARK_EXECUTOR_ID --cores $SPARK_EXECUTOR_CORES --app-id $SPARK_APPLICATION_ID --hostname $SPARK_EXECUTOR_POD_IP)
+ exec /usr/bin/tini -s -- /usr/local/openjdk-8/bin/java -Dio.netty.tryReflectionSetAccessible=true -Dspark.driver.port=37771 -Xms1g -Xmx1g -cp ':/opt/spark/jars/*:' org.apache.spark.executor.CoarseGrainedExecutorBackend --driver-url spark://CoarseGrainedScheduler@testspark:37771 --executor-id 1 --cores 1 --app-id spark-application-1612108001966 --hostname 172.30.174.196
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
21/01/31 15:46:49 INFO CoarseGrainedExecutorBackend: Started daemon with process name: 14@pysparktest-c1c8f177591feb60-exec-1
21/01/31 15:46:49 INFO SignalUtils: Registered signal handler for TERM
21/01/31 15:46:49 INFO SignalUtils: Registered signal handler for HUP
21/01/31 15:46:49 INFO SignalUtils: Registered signal handler for INT
21/01/31 15:46:49 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
21/01/31 15:46:49 INFO SecurityManager: Changing view acls to: 185,root
21/01/31 15:46:49 INFO SecurityManager: Changing modify acls to: 185,root
21/01/31 15:46:49 INFO SecurityManager: Changing view acls groups to: 
21/01/31 15:46:49 INFO SecurityManager: Changing modify acls groups to: 
21/01/31 15:46:49 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users  with view permissions: Set(185, root); groups with view permissions: Set(); users  with modify permissions: Set(185, root); groups with modify permissions: Set()
Exception in thread "main" java.lang.reflect.UndeclaredThrowableException
	at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1748)
	at org.apache.spark.deploy.SparkHadoopUtil.runAsSparkUser(SparkHadoopUtil.scala:61)
	at org.apache.spark.executor.CoarseGrainedExecutorBackend$.run(CoarseGrainedExecutorBackend.scala:283)
	at org.apache.spark.executor.CoarseGrainedExecutorBackend$.main(CoarseGrainedExecutorBackend.scala:272)
	at org.apache.spark.executor.CoarseGrainedExecutorBackend.main(CoarseGrainedExecutorBackend.scala)
Caused by: org.apache.spark.SparkException: Exception thrown in awaitResult: 
	at org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:302)
	at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:75)
	at org.apache.spark.rpc.RpcEnv.setupEndpointRefByURI(RpcEnv.scala:101)
	at org.apache.spark.executor.CoarseGrainedExecutorBackend$.$anonfun$run$3(CoarseGrainedExecutorBackend.scala:303)
	at scala.runtime.java8.JFunction1$mcVI$sp.apply(JFunction1$mcVI$sp.java:23)
	at scala.collection.TraversableLike$WithFilter.$anonfun$foreach$1(TraversableLike.scala:877)
	at scala.collection.immutable.Range.foreach(Range.scala:158)
	at scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:876)
	at org.apache.spark.executor.CoarseGrainedExecutorBackend$.$anonfun$run$1(CoarseGrainedExecutorBackend.scala:301)
	at org.apache.spark.deploy.SparkHadoopUtil$anon$1.run(SparkHadoopUtil.scala:62)
	at org.apache.spark.deploy.SparkHadoopUtil$anon$1.run(SparkHadoopUtil.scala:61)
	at java.security.AccessController.doPrivileged(Native Method)
	at javax.security.auth.Subject.doAs(Subject.java:422)
	at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1730)
	... 4 more
Caused by: java.io.IOException: Failed to connect to testspark/172.30.174.253:37771
	at org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:253)
	at org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:195)
	at org.apache.spark.rpc.netty.NettyRpcEnv.createClient(NettyRpcEnv.scala:204)
	at org.apache.spark.rpc.netty.Outbox$anon$1.call(Outbox.scala:202)
	at org.apache.spark.rpc.netty.Outbox$anon$1.call(Outbox.scala:198)
	at java.util.concurrent.FutureTask.run(FutureTask.java:266)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)
Caused by: io.netty.channel.AbstractChannel$AnnotatedConnectException: Connection refused: testspark/172.30.174.253:37771
Caused by: java.net.ConnectException: Connection refused
	at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
	at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:716)
	at io.netty.channel.socket.nio.NioSocketChannel.doFinishConnect(NioSocketChannel.java:330)
	at io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.finishConnect(AbstractNioChannel.java:334)
	at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:702)
	at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:650)
	at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:576)
	at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:493)
	at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989)
	at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
	at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
	at java.lang.Thread.run(Thread.java:748)

I have also created a headless service according to the instruction here https://spark.apache.org/docs/latest/running-on-kubernetes.html#client-mode-networking. Below is the yaml for the service as well as the driver pod:

Service

apiVersion: v1
kind: Service
metadata:
    name: testspark
spec:
    clusterIP: "None"
    selector:
        spark-app-selector: testspark
    ports:
        - name: driver-rpc-port
          protocol: TCP 
          port: 37771
          targetPort: 37771
        - name: blockmanager
          protocol: TCP 
          port: 37772
          targetPort: 37772

Driver Pod

apiVersion: v1
kind: Pod
metadata:
  name: testspark
  labels:
    spark-app-selector: testspark
spec:
  containers:
    - name: testspark
      securityContext:
        runAsUser: 0
      image: jupyter/pyspark-notebook
      ports:
        - containerPort: 37771
      command: ["tail", "-f", "/dev/null"]
  serviceAccountName: default-editor

This should have allowed executor pods to connect to the driver (which, I checked have the correct ip 172.30.174.249). To debug the network, I started a shell in the driver container and netstat the listening ports. Here is the output for the same:

root@testspark:/opt/spark/work-dir# netstat -tulpn
Active Internet connections (only servers)
Proto Recv-Q Send-Q Local Address           Foreign Address         State       PID/Program name    
tcp        0      0 127.0.0.1:15000         0.0.0.0:*               LISTEN      -                   
tcp        0      0 0.0.0.0:15001           0.0.0.0:*               LISTEN      -                   
tcp        0      0 0.0.0.0:15090           0.0.0.0:*               LISTEN      -                   
tcp6       0      0 :::4040                 :::*                    LISTEN      35/java             
tcp6       0      0 :::37771                :::*                    LISTEN      35/java             
tcp6       0      0 :::15020                :::*                    LISTEN      -                   
tcp6       0      0 :::41613                :::*                    LISTEN      35/java 

I also tried to connect to the driver pod on the port 37771 via telnet from another running pod on the same namespace and it was able to connect.

root@test:/# telnet 172.30.174.249 37771
Trying 172.30.174.249...
Connected to 172.30.174.249.
Escape character is '^]'.

I am not sure why my executor pods are not able to connect to the driver on the same port. Am I missing any configuration or am I doing anything wrong? I can supply more information if required.

UPDATE

I created a fake spark executor image with the following docker file:

FROM us.icr.io/testspark/spark:v1

ENTRYPOINT ["tail", "-f", "/dev/null"]

and passed this image as spark.kubernetes.container.image config while instantiating the spark context. I got two running executor pods. I exec into one of them with command kubectl exec -n anonymous -it pysparktest-c1c8f177591feb60-exec-1 -c spark-kubernetes-executor bash and ran the following command /opt/entrypoint.sh executor and to my surprise, executor could connect with the driver just fine. Here is the stack trace for the same:

++ id -u
+ myuid=185
++ id -g
+ mygid=0
+ set +e
++ getent passwd 185
+ uidentry='185:x:185:0:anonymous uid:/opt/spark:/bin/false'
+ set -e
+ '[' -z '185:x:185:0:anonymous uid:/opt/spark:/bin/false' ']'
+ SPARK_CLASSPATH=':/opt/spark/jars/*'
+ env
+ grep SPARK_JAVA_OPT_
+ sort -t_ -k4 -n
+ sed 's/[^=]*=\(.*\)/\1/g'
+ readarray -t SPARK_EXECUTOR_JAVA_OPTS
+ '[' -n '' ']'
+ '[' '' == 2 ']'
+ '[' '' == 3 ']'
+ '[' -n '' ']'
+ '[' -z ']'
+ case "$1" in
+ shift 1
+ CMD=(${JAVA_HOME}/bin/java "${SPARK_EXECUTOR_JAVA_OPTS[@]}" -Xms$SPARK_EXECUTOR_MEMORY -Xmx$SPARK_EXECUTOR_MEMORY -cp "$SPARK_CLASSPATH:$SPARK_DIST_CLASSPATH" org.apache.spark.executor.CoarseGrainedExecutorBackend --driver-url $SPARK_DRIVER_URL --executor-id $SPARK_EXECUTOR_ID --cores $SPARK_EXECUTOR_CORES --app-id $SPARK_APPLICATION_ID --hostname $SPARK_EXECUTOR_POD_IP)
+ exec /usr/bin/tini -s -- /usr/local/openjdk-8/bin/java -Dio.netty.tryReflectionSetAccessible=true -Dspark.driver.port=37771 -Xms1g -Xmx1g -cp ':/opt/spark/jars/*:' org.apache.spark.executor.CoarseGrainedExecutorBackend --driver-url spark://CoarseGrainedScheduler@testspark.anonymous.svc.cluster.local:37771 --executor-id 1 --cores 1 --app-id spark-application-1612191192882 --hostname 172.30.174.249
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
21/02/01 15:00:16 INFO CoarseGrainedExecutorBackend: Started daemon with process name: 39@pysparktest-27b678775e1556d9-exec-1
21/02/01 15:00:16 INFO SignalUtils: Registered signal handler for TERM
21/02/01 15:00:16 INFO SignalUtils: Registered signal handler for HUP
21/02/01 15:00:16 INFO SignalUtils: Registered signal handler for INT
21/02/01 15:00:16 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
21/02/01 15:00:17 INFO SecurityManager: Changing view acls to: 185,root
21/02/01 15:00:17 INFO SecurityManager: Changing modify acls to: 185,root
21/02/01 15:00:17 INFO SecurityManager: Changing view acls groups to: 
21/02/01 15:00:17 INFO SecurityManager: Changing modify acls groups to: 
21/02/01 15:00:17 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users  with view permissions: Set(185, root); groups with view permissions: Set(); users  with modify permissions: Set(185, root); groups with modify permissions: Set()
21/02/01 15:00:17 INFO TransportClientFactory: Successfully created connection to testspark.anonymous.svc.cluster.local/172.30.174.253:37771 after 173 ms (0 ms spent in bootstraps)
21/02/01 15:00:18 INFO SecurityManager: Changing view acls to: 185,root
21/02/01 15:00:18 INFO SecurityManager: Changing modify acls to: 185,root
21/02/01 15:00:18 INFO SecurityManager: Changing view acls groups to: 
21/02/01 15:00:18 INFO SecurityManager: Changing modify acls groups to: 
21/02/01 15:00:18 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users  with view permissions: Set(185, root); groups with view permissions: Set(); users  with modify permissions: Set(185, root); groups with modify permissions: Set()
21/02/01 15:00:18 INFO TransportClientFactory: Successfully created connection to testspark.anonymous.svc.cluster.local/172.30.174.253:37771 after 3 ms (0 ms spent in bootstraps)
21/02/01 15:00:18 INFO DiskBlockManager: Created local directory at /var/data/spark-839bad93-b01c-4bc9-a33f-51c7493775e3/blockmgr-ad6a42b9-cfe2-4cdd-aa28-37a0ab77fb16
21/02/01 15:00:18 INFO MemoryStore: MemoryStore started with capacity 413.9 MiB
21/02/01 15:00:19 INFO CoarseGrainedExecutorBackend: Connecting to driver: spark://CoarseGrainedScheduler@testspark.anonymous.svc.cluster.local:37771
21/02/01 15:00:19 INFO ResourceUtils: ==============================================================
21/02/01 15:00:19 INFO ResourceUtils: Resources for spark.executor:

21/02/01 15:00:19 INFO ResourceUtils: ==============================================================
21/02/01 15:00:19 INFO CoarseGrainedExecutorBackend: Successfully registered with driver
21/02/01 15:00:19 INFO Executor: Starting executor ID 1 on host 172.30.174.249
21/02/01 15:00:19 INFO Utils: Successfully started service 'org.apache.spark.network.netty.NettyBlockTransferService' on port 40515.
21/02/01 15:00:19 INFO NettyBlockTransferService: Server created on 172.30.174.249:40515
21/02/01 15:00:19 INFO BlockManager: Using org.apache.spark.storage.RandomBlockReplicationPolicy for block replication policy
21/02/01 15:00:19 INFO BlockManagerMaster: Registering BlockManager BlockManagerId(1, 172.30.174.249, 40515, None)
21/02/01 15:00:19 INFO BlockManagerMaster: Registered BlockManager BlockManagerId(1, 172.30.174.249, 40515, None)
21/02/01 15:00:19 INFO BlockManager: Initialized BlockManager: BlockManagerId(1, 172.30.174.249, 40515, None)

I am actually puzzeled why this might be happening. Is there any work around I can try to get this thing working automatically instead of me having to run it manually?

-- Satyam
apache-spark
kubernetes
pyspark

2 Answers

2/2/2021

I was finally able to solve this problem with the help of my colleague. I just added these two configs to disable the istio sidecar injection and it started working.

sparkConf.set("spark.kubernetes.driver.annotation.sidecar.istio.io/inject", "false")
sparkConf.set("spark.kubernetes.executor.annotation.sidecar.istio.io/inject", "false")
-- Satyam
Source: StackOverflow

2/1/2021

I don't have much experience with PySpark but I once setup Java Spark to run on a Kubernetes cluster in client mode, like you are trying now.. and I believe the configuration should mostly be the same.

First of all, you should check if the headless service is working as expected or not. First with a:

kubectl describe -n anonymous testspark

and see if there are any endpoints and the whole description. Second, from inside one of your Pods you could check if nslookup resolve the hostname you are expecting your driver Pod to have.

kubectl exec -n <namespace> -it <pod-name> -- bash    // exec into a Pod which have nslookup
nslookup testspark
nslookup testspark.testspark

if the names do resolve correctly to the driver Pod current ip address, then it could be something related to the Spark configuration.

The only difference I found between your configuration and the one I was using with java, is that as spark.driver.host I was using something like:

service-name.namespace.svc.cluster.local

but, in theory, it should be the same.

Another thing which could be useful in debugging the problem, is a describe of one of the executors Pods, just to check that the configuration are all correct.

Edit:

This is the spark-submit command I was using:

sh /opt/spark/bin/spark-submit --master k8s://https://master-ip:6443 --deploy-mode client --name ${POD_NAME} --class "my.spark.application.main.Processor" --conf "spark.executor.instances=2" --conf "spark.kubernetes.namespace=${POD_NAMESPACE}" --conf "spark.kubernetes.driver.pod.name=${POD_NAME}" --conf "spark.kubernetes.container.image=${SPARK_IMAGE}" --conf "spark.kubernetes.executor.request.cores=2" --conf "spark.kubernetes.executor.limit.cores=2" --conf "spark.driver.memory=4g" --conf "spark.executor.cores=2" --conf "spark.executor.memory=4g" --conf "spark.driver.host=${SPARK_DRIVER_HOST}" --conf "spark.ui.dagGraph.retainedRootRDDs=1000" --conf "spark.driver.port=${SPARK_DRIVER_PORT}" --conf "spark.driver.extraJavaOptions=${DRIVER_JAVA_ARGS}" --conf "spark.executor.extraJavaOptions=${EXECUTOR_JAVA_ARGS}" /opt/spark/jars/app.jar -jobConfig ${JOB_CONFIGURATION}

And most of the configurations like spark_driver_port or spark_driver_host were completely similar to yours, just different because names and ports were different.

-- AndD
Source: StackOverflow