I am trying to run a simple spark job on a kubernetes cluster. I deployed a pod that starts a pyspark shell and in that shell I am changing the spark configuration as specified below:
>>> sc.stop()
>>> sparkConf = SparkConf()
>>> sparkConf.setMaster("k8s://https://kubernetes.default.svc:443")
>>> sparkConf.setAppName("pyspark_test")
>>> sparkConf.set("spark.submit.deployMode", "client")
>>> sparkConf.set("spark.executor.instances", 2)
>>> sparkConf.set("spark.kubernetes.container.image", "us.icr.io/testspark/spark:v1")
>>> sparkConf.set("spark.kubernetes.namespace", "anonymous")
>>> sparkConf.set("spark.driver.memory", "1g")
>>> sparkConf.set("spark.executor.memory", "1g")
>>> sparkConf.set("spark.driver.host", "testspark")
>>> sparkConf.set("spark.driver.port", "37771")
>>> sparkConf.set("spark.kubernetes.driver.pod.name", "testspark")
>>> sparkConf.set("spark.driver.bindAddress", "0.0.0.0")
>>>
>>> spark = SparkSession.builder.config(conf=sparkConf).getOrCreate()
>>> sc = spark.sparkContext
This starts two new executor pods but both fails:
satyam@Satyams-MBP ~ % kubectl get pods -n anonymous
NAME READY STATUS RESTARTS AGE
pysparktest-c1c8f177591feb60-exec-1 0/2 Error 0 111m
pysparktest-c1c8f177591feb60-exec-2 0/2 Error 0 111m
testspark 2/2 Running 0 116m
I checked logs for one of the executor pod and it shows following error:
satyam@Satyams-MBP ~ % kubectl logs -n anonymous pysparktest-c1c8f177591feb60-exec-1 -c spark-kubernetes-executor
++ id -u
+ myuid=185
++ id -g
+ mygid=0
+ set +e
++ getent passwd 185
+ uidentry=
+ set -e
+ '[' -z '' ']'
+ '[' -w /etc/passwd ']'
+ echo '185:x:185:0:anonymous uid:/opt/spark:/bin/false'
+ SPARK_CLASSPATH=':/opt/spark/jars/*'
+ env
+ grep SPARK_JAVA_OPT_
+ sed 's/[^=]*=\(.*\)/\1/g'
+ sort -t_ -k4 -n
+ readarray -t SPARK_EXECUTOR_JAVA_OPTS
+ '[' -n '' ']'
+ '[' '' == 2 ']'
+ '[' '' == 3 ']'
+ '[' -n '' ']'
+ '[' -z ']'
+ case "$1" in
+ shift 1
+ CMD=(${JAVA_HOME}/bin/java "${SPARK_EXECUTOR_JAVA_OPTS[@]}" -Xms$SPARK_EXECUTOR_MEMORY -Xmx$SPARK_EXECUTOR_MEMORY -cp "$SPARK_CLASSPATH:$SPARK_DIST_CLASSPATH" org.apache.spark.executor.CoarseGrainedExecutorBackend --driver-url $SPARK_DRIVER_URL --executor-id $SPARK_EXECUTOR_ID --cores $SPARK_EXECUTOR_CORES --app-id $SPARK_APPLICATION_ID --hostname $SPARK_EXECUTOR_POD_IP)
+ exec /usr/bin/tini -s -- /usr/local/openjdk-8/bin/java -Dio.netty.tryReflectionSetAccessible=true -Dspark.driver.port=37771 -Xms1g -Xmx1g -cp ':/opt/spark/jars/*:' org.apache.spark.executor.CoarseGrainedExecutorBackend --driver-url spark://CoarseGrainedScheduler@testspark:37771 --executor-id 1 --cores 1 --app-id spark-application-1612108001966 --hostname 172.30.174.196
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
21/01/31 15:46:49 INFO CoarseGrainedExecutorBackend: Started daemon with process name: 14@pysparktest-c1c8f177591feb60-exec-1
21/01/31 15:46:49 INFO SignalUtils: Registered signal handler for TERM
21/01/31 15:46:49 INFO SignalUtils: Registered signal handler for HUP
21/01/31 15:46:49 INFO SignalUtils: Registered signal handler for INT
21/01/31 15:46:49 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
21/01/31 15:46:49 INFO SecurityManager: Changing view acls to: 185,root
21/01/31 15:46:49 INFO SecurityManager: Changing modify acls to: 185,root
21/01/31 15:46:49 INFO SecurityManager: Changing view acls groups to:
21/01/31 15:46:49 INFO SecurityManager: Changing modify acls groups to:
21/01/31 15:46:49 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(185, root); groups with view permissions: Set(); users with modify permissions: Set(185, root); groups with modify permissions: Set()
Exception in thread "main" java.lang.reflect.UndeclaredThrowableException
at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1748)
at org.apache.spark.deploy.SparkHadoopUtil.runAsSparkUser(SparkHadoopUtil.scala:61)
at org.apache.spark.executor.CoarseGrainedExecutorBackend$.run(CoarseGrainedExecutorBackend.scala:283)
at org.apache.spark.executor.CoarseGrainedExecutorBackend$.main(CoarseGrainedExecutorBackend.scala:272)
at org.apache.spark.executor.CoarseGrainedExecutorBackend.main(CoarseGrainedExecutorBackend.scala)
Caused by: org.apache.spark.SparkException: Exception thrown in awaitResult:
at org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:302)
at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:75)
at org.apache.spark.rpc.RpcEnv.setupEndpointRefByURI(RpcEnv.scala:101)
at org.apache.spark.executor.CoarseGrainedExecutorBackend$.$anonfun$run$3(CoarseGrainedExecutorBackend.scala:303)
at scala.runtime.java8.JFunction1$mcVI$sp.apply(JFunction1$mcVI$sp.java:23)
at scala.collection.TraversableLike$WithFilter.$anonfun$foreach$1(TraversableLike.scala:877)
at scala.collection.immutable.Range.foreach(Range.scala:158)
at scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:876)
at org.apache.spark.executor.CoarseGrainedExecutorBackend$.$anonfun$run$1(CoarseGrainedExecutorBackend.scala:301)
at org.apache.spark.deploy.SparkHadoopUtil$anon$1.run(SparkHadoopUtil.scala:62)
at org.apache.spark.deploy.SparkHadoopUtil$anon$1.run(SparkHadoopUtil.scala:61)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1730)
... 4 more
Caused by: java.io.IOException: Failed to connect to testspark/172.30.174.253:37771
at org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:253)
at org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:195)
at org.apache.spark.rpc.netty.NettyRpcEnv.createClient(NettyRpcEnv.scala:204)
at org.apache.spark.rpc.netty.Outbox$anon$1.call(Outbox.scala:202)
at org.apache.spark.rpc.netty.Outbox$anon$1.call(Outbox.scala:198)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: io.netty.channel.AbstractChannel$AnnotatedConnectException: Connection refused: testspark/172.30.174.253:37771
Caused by: java.net.ConnectException: Connection refused
at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:716)
at io.netty.channel.socket.nio.NioSocketChannel.doFinishConnect(NioSocketChannel.java:330)
at io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.finishConnect(AbstractNioChannel.java:334)
at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:702)
at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:650)
at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:576)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:493)
at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989)
at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
at java.lang.Thread.run(Thread.java:748)
I have also created a headless service according to the instruction here https://spark.apache.org/docs/latest/running-on-kubernetes.html#client-mode-networking. Below is the yaml for the service as well as the driver pod:
Service
apiVersion: v1
kind: Service
metadata:
name: testspark
spec:
clusterIP: "None"
selector:
spark-app-selector: testspark
ports:
- name: driver-rpc-port
protocol: TCP
port: 37771
targetPort: 37771
- name: blockmanager
protocol: TCP
port: 37772
targetPort: 37772
Driver Pod
apiVersion: v1
kind: Pod
metadata:
name: testspark
labels:
spark-app-selector: testspark
spec:
containers:
- name: testspark
securityContext:
runAsUser: 0
image: jupyter/pyspark-notebook
ports:
- containerPort: 37771
command: ["tail", "-f", "/dev/null"]
serviceAccountName: default-editor
This should have allowed executor pods to connect to the driver (which, I checked have the correct ip 172.30.174.249). To debug the network, I started a shell in the driver container and netstat the listening ports. Here is the output for the same:
root@testspark:/opt/spark/work-dir# netstat -tulpn
Active Internet connections (only servers)
Proto Recv-Q Send-Q Local Address Foreign Address State PID/Program name
tcp 0 0 127.0.0.1:15000 0.0.0.0:* LISTEN -
tcp 0 0 0.0.0.0:15001 0.0.0.0:* LISTEN -
tcp 0 0 0.0.0.0:15090 0.0.0.0:* LISTEN -
tcp6 0 0 :::4040 :::* LISTEN 35/java
tcp6 0 0 :::37771 :::* LISTEN 35/java
tcp6 0 0 :::15020 :::* LISTEN -
tcp6 0 0 :::41613 :::* LISTEN 35/java
I also tried to connect to the driver pod on the port 37771 via telnet from another running pod on the same namespace and it was able to connect.
root@test:/# telnet 172.30.174.249 37771
Trying 172.30.174.249...
Connected to 172.30.174.249.
Escape character is '^]'.
I am not sure why my executor pods are not able to connect to the driver on the same port. Am I missing any configuration or am I doing anything wrong? I can supply more information if required.
UPDATE
I created a fake spark executor image with the following docker file:
FROM us.icr.io/testspark/spark:v1
ENTRYPOINT ["tail", "-f", "/dev/null"]
and passed this image as spark.kubernetes.container.image
config while instantiating the spark context. I got two running executor pods. I exec into one of them with command kubectl exec -n anonymous -it pysparktest-c1c8f177591feb60-exec-1 -c spark-kubernetes-executor bash
and ran the following command /opt/entrypoint.sh executor
and to my surprise, executor could connect with the driver just fine. Here is the stack trace for the same:
++ id -u
+ myuid=185
++ id -g
+ mygid=0
+ set +e
++ getent passwd 185
+ uidentry='185:x:185:0:anonymous uid:/opt/spark:/bin/false'
+ set -e
+ '[' -z '185:x:185:0:anonymous uid:/opt/spark:/bin/false' ']'
+ SPARK_CLASSPATH=':/opt/spark/jars/*'
+ env
+ grep SPARK_JAVA_OPT_
+ sort -t_ -k4 -n
+ sed 's/[^=]*=\(.*\)/\1/g'
+ readarray -t SPARK_EXECUTOR_JAVA_OPTS
+ '[' -n '' ']'
+ '[' '' == 2 ']'
+ '[' '' == 3 ']'
+ '[' -n '' ']'
+ '[' -z ']'
+ case "$1" in
+ shift 1
+ CMD=(${JAVA_HOME}/bin/java "${SPARK_EXECUTOR_JAVA_OPTS[@]}" -Xms$SPARK_EXECUTOR_MEMORY -Xmx$SPARK_EXECUTOR_MEMORY -cp "$SPARK_CLASSPATH:$SPARK_DIST_CLASSPATH" org.apache.spark.executor.CoarseGrainedExecutorBackend --driver-url $SPARK_DRIVER_URL --executor-id $SPARK_EXECUTOR_ID --cores $SPARK_EXECUTOR_CORES --app-id $SPARK_APPLICATION_ID --hostname $SPARK_EXECUTOR_POD_IP)
+ exec /usr/bin/tini -s -- /usr/local/openjdk-8/bin/java -Dio.netty.tryReflectionSetAccessible=true -Dspark.driver.port=37771 -Xms1g -Xmx1g -cp ':/opt/spark/jars/*:' org.apache.spark.executor.CoarseGrainedExecutorBackend --driver-url spark://CoarseGrainedScheduler@testspark.anonymous.svc.cluster.local:37771 --executor-id 1 --cores 1 --app-id spark-application-1612191192882 --hostname 172.30.174.249
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
21/02/01 15:00:16 INFO CoarseGrainedExecutorBackend: Started daemon with process name: 39@pysparktest-27b678775e1556d9-exec-1
21/02/01 15:00:16 INFO SignalUtils: Registered signal handler for TERM
21/02/01 15:00:16 INFO SignalUtils: Registered signal handler for HUP
21/02/01 15:00:16 INFO SignalUtils: Registered signal handler for INT
21/02/01 15:00:16 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
21/02/01 15:00:17 INFO SecurityManager: Changing view acls to: 185,root
21/02/01 15:00:17 INFO SecurityManager: Changing modify acls to: 185,root
21/02/01 15:00:17 INFO SecurityManager: Changing view acls groups to:
21/02/01 15:00:17 INFO SecurityManager: Changing modify acls groups to:
21/02/01 15:00:17 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(185, root); groups with view permissions: Set(); users with modify permissions: Set(185, root); groups with modify permissions: Set()
21/02/01 15:00:17 INFO TransportClientFactory: Successfully created connection to testspark.anonymous.svc.cluster.local/172.30.174.253:37771 after 173 ms (0 ms spent in bootstraps)
21/02/01 15:00:18 INFO SecurityManager: Changing view acls to: 185,root
21/02/01 15:00:18 INFO SecurityManager: Changing modify acls to: 185,root
21/02/01 15:00:18 INFO SecurityManager: Changing view acls groups to:
21/02/01 15:00:18 INFO SecurityManager: Changing modify acls groups to:
21/02/01 15:00:18 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(185, root); groups with view permissions: Set(); users with modify permissions: Set(185, root); groups with modify permissions: Set()
21/02/01 15:00:18 INFO TransportClientFactory: Successfully created connection to testspark.anonymous.svc.cluster.local/172.30.174.253:37771 after 3 ms (0 ms spent in bootstraps)
21/02/01 15:00:18 INFO DiskBlockManager: Created local directory at /var/data/spark-839bad93-b01c-4bc9-a33f-51c7493775e3/blockmgr-ad6a42b9-cfe2-4cdd-aa28-37a0ab77fb16
21/02/01 15:00:18 INFO MemoryStore: MemoryStore started with capacity 413.9 MiB
21/02/01 15:00:19 INFO CoarseGrainedExecutorBackend: Connecting to driver: spark://CoarseGrainedScheduler@testspark.anonymous.svc.cluster.local:37771
21/02/01 15:00:19 INFO ResourceUtils: ==============================================================
21/02/01 15:00:19 INFO ResourceUtils: Resources for spark.executor:
21/02/01 15:00:19 INFO ResourceUtils: ==============================================================
21/02/01 15:00:19 INFO CoarseGrainedExecutorBackend: Successfully registered with driver
21/02/01 15:00:19 INFO Executor: Starting executor ID 1 on host 172.30.174.249
21/02/01 15:00:19 INFO Utils: Successfully started service 'org.apache.spark.network.netty.NettyBlockTransferService' on port 40515.
21/02/01 15:00:19 INFO NettyBlockTransferService: Server created on 172.30.174.249:40515
21/02/01 15:00:19 INFO BlockManager: Using org.apache.spark.storage.RandomBlockReplicationPolicy for block replication policy
21/02/01 15:00:19 INFO BlockManagerMaster: Registering BlockManager BlockManagerId(1, 172.30.174.249, 40515, None)
21/02/01 15:00:19 INFO BlockManagerMaster: Registered BlockManager BlockManagerId(1, 172.30.174.249, 40515, None)
21/02/01 15:00:19 INFO BlockManager: Initialized BlockManager: BlockManagerId(1, 172.30.174.249, 40515, None)
I am actually puzzeled why this might be happening. Is there any work around I can try to get this thing working automatically instead of me having to run it manually?
I was finally able to solve this problem with the help of my colleague. I just added these two configs to disable the istio sidecar injection and it started working.
sparkConf.set("spark.kubernetes.driver.annotation.sidecar.istio.io/inject", "false")
sparkConf.set("spark.kubernetes.executor.annotation.sidecar.istio.io/inject", "false")
I don't have much experience with PySpark but I once setup Java Spark to run on a Kubernetes cluster in client mode, like you are trying now.. and I believe the configuration should mostly be the same.
First of all, you should check if the headless service is working as expected or not. First with a:
kubectl describe -n anonymous testspark
and see if there are any endpoints and the whole description. Second, from inside one of your Pods you could check if nslookup resolve the hostname you are expecting your driver Pod to have.
kubectl exec -n <namespace> -it <pod-name> -- bash // exec into a Pod which have nslookup
nslookup testspark
nslookup testspark.testspark
if the names do resolve correctly to the driver Pod current ip address, then it could be something related to the Spark configuration.
The only difference I found between your configuration and the one I was using with java, is that as spark.driver.host I was using something like:
service-name.namespace.svc.cluster.local
but, in theory, it should be the same.
Another thing which could be useful in debugging the problem, is a describe of one of the executors Pods, just to check that the configuration are all correct.
Edit:
This is the spark-submit command I was using:
sh /opt/spark/bin/spark-submit --master k8s://https://master-ip:6443 --deploy-mode client --name ${POD_NAME} --class "my.spark.application.main.Processor" --conf "spark.executor.instances=2" --conf "spark.kubernetes.namespace=${POD_NAMESPACE}" --conf "spark.kubernetes.driver.pod.name=${POD_NAME}" --conf "spark.kubernetes.container.image=${SPARK_IMAGE}" --conf "spark.kubernetes.executor.request.cores=2" --conf "spark.kubernetes.executor.limit.cores=2" --conf "spark.driver.memory=4g" --conf "spark.executor.cores=2" --conf "spark.executor.memory=4g" --conf "spark.driver.host=${SPARK_DRIVER_HOST}" --conf "spark.ui.dagGraph.retainedRootRDDs=1000" --conf "spark.driver.port=${SPARK_DRIVER_PORT}" --conf "spark.driver.extraJavaOptions=${DRIVER_JAVA_ARGS}" --conf "spark.executor.extraJavaOptions=${EXECUTOR_JAVA_ARGS}" /opt/spark/jars/app.jar -jobConfig ${JOB_CONFIGURATION}
And most of the configurations like spark_driver_port or spark_driver_host were completely similar to yours, just different because names and ports were different.