How can i run a pyspark job on k8s?

3/4/2020

I'm trying to run a hello world spark application on k8s cluster. I've built my own docker image with the script on top of a standard pyspark docker image and now I'm trying to run this image on k8s cluster, but get the following error. DNS pods logs are okay.

My current Dockerfile:

FROM semenchukou/spark-py:v2.4.1
COPY . /app
WORKDIR /app

The command I'm using to deploy the job on k8s:

bin/spark-submit 
      --master k8s://https://172.20.234.174:6443
      --deploy-mode cluster
      --conf spark.executor.instances=2
      --conf spark.kubernetes.authenticate.driver.serviceAccountName=spark
      --conf spark.kubernetes.container.image=semenchukou/pyspark-k8s-example:likeEx3
      --name spark_k8s_hello_world_0
      --conf spark.kubernetes.pyspark.pythonVersion=3
      local:///app/HelloWorldSpark.py

Error:

Traceback (most recent call last):
  File "/app/HelloWorldSpark.py", line 10, in <module>
    .appName("PythonPi")\
  File "/opt/spark/python/lib/pyspark.zip/pyspark/sql/session.py", line 173, in getOrCreate
  File "/opt/spark/python/lib/pyspark.zip/pyspark/context.py", line 367, in getOrCreate
  File "/opt/spark/python/lib/pyspark.zip/pyspark/context.py", line 136, in __init__
  File "/opt/spark/python/lib/pyspark.zip/pyspark/context.py", line 198, in _do_init
  File "/opt/spark/python/lib/pyspark.zip/pyspark/context.py", line 306, in _initialize_context
  File "/opt/spark/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py", line 1525, in __call__
  File "/opt/spark/python/lib/py4j-0.10.7-src.zip/py4j/protocol.py", line 328, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling None.org.apache.spark.api.java.JavaSparkContext.
: org.apache.spark.SparkException: External scheduler cannot be instantiated
        at org.apache.spark.SparkContext$.org$apache$spark$SparkContext$createTaskScheduler(SparkContext.scala:2794)
        at org.apache.spark.SparkContext.<init>(SparkContext.scala:493)
        at org.apache.spark.api.java.JavaSparkContext.<init>(JavaSparkContext.scala:58)
        at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
        at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
        at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
        at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
        at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:247)
        at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
        at py4j.Gateway.invoke(Gateway.java:238)
        at py4j.commands.ConstructorCommand.invokeConstructor(ConstructorCommand.java:80)
        at py4j.commands.ConstructorCommand.execute(ConstructorCommand.java:69)
        at py4j.GatewayConnection.run(GatewayConnection.java:238)
        at java.lang.Thread.run(Thread.java:748)
Caused by: io.fabric8.kubernetes.client.KubernetesClientException: Operation: [get]  for kind: [Pod]  with name: [sparkk8shelloworld0-1583151334880-driver]  in namespace: [default]  failed.
        at io.fabric8.kubernetes.client.KubernetesClientException.launderThrowable(KubernetesClientException.java:64)
        at io.fabric8.kubernetes.client.KubernetesClientException.launderThrowable(KubernetesClientException.java:72)
        at io.fabric8.kubernetes.client.dsl.base.BaseOperation.getMandatory(BaseOperation.java:229)
        at io.fabric8.kubernetes.client.dsl.base.BaseOperation.get(BaseOperation.java:162)
        at org.apache.spark.scheduler.cluster.k8s.ExecutorPodsAllocator$anonfun$1.apply(ExecutorPodsAllocator.scala:57)
        at org.apache.spark.scheduler.cluster.k8s.ExecutorPodsAllocator$anonfun$1.apply(ExecutorPodsAllocator.scala:55)
        at scala.Option.map(Option.scala:146)
        at org.apache.spark.scheduler.cluster.k8s.ExecutorPodsAllocator.<init>(ExecutorPodsAllocator.scala:55)
        at org.apache.spark.scheduler.cluster.k8s.KubernetesClusterManager.createSchedulerBackend(KubernetesClusterManager.scala:89)
        at org.apache.spark.SparkContext$.org$apache$spark$SparkContext$createTaskScheduler(SparkContext.scala:2788)
        ... 13 more
Caused by: java.net.UnknownHostException: kubernetes.default.svc: Try again
        at java.net.Inet4AddressImpl.lookupAllHostAddr(Native Method)
        at java.net.InetAddress$2.lookupAllHostAddr(InetAddress.java:929)
        at java.net.InetAddress.getAddressesFromNameService(InetAddress.java:1324)
        at java.net.InetAddress.getAllByName0(InetAddress.java:1277)
        at java.net.InetAddress.getAllByName(InetAddress.java:1193)
        at java.net.InetAddress.getAllByName(InetAddress.java:1127)
        at okhttp3.Dns$1.lookup(Dns.java:39)
        at okhttp3.internal.connection.RouteSelector.resetNextInetSocketAddress(RouteSelector.java:171)
        at okhttp3.internal.connection.RouteSelector.nextProxy(RouteSelector.java:137)
        at okhttp3.internal.connection.RouteSelector.next(RouteSelector.java:82)
        at okhttp3.internal.connection.StreamAllocation.findConnection(StreamAllocation.java:171)
        at okhttp3.internal.connection.StreamAllocation.findHealthyConnection(StreamAllocation.java:121)
        at okhttp3.internal.connection.StreamAllocation.newStream(StreamAllocation.java:100)
        at okhttp3.internal.connection.ConnectInterceptor.intercept(ConnectInterceptor.java:42)
        at okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:92)
        at okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:67)
        at okhttp3.internal.cache.CacheInterceptor.intercept(CacheInterceptor.java:93)
        at okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:92)
        at okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:67)
        at okhttp3.internal.http.BridgeInterceptor.intercept(BridgeInterceptor.java:93)
        at okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:92)
        at okhttp3.internal.http.RetryAndFollowUpInterceptor.intercept(RetryAndFollowUpInterceptor.java:120)
        at okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:92)
        at okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:67)
        at io.fabric8.kubernetes.client.utils.BackwardsCompatibilityInterceptor.intercept(BackwardsCompatibilityInterceptor.java:119)
        at okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:92)
        at okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:67)
        at io.fabric8.kubernetes.client.utils.ImpersonatorInterceptor.intercept(ImpersonatorInterceptor.java:68)
        at okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:92)
        at okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:67)
        at io.fabric8.kubernetes.client.utils.HttpClientUtils.lambda$createHttpClient$3(HttpClientUtils.java:110)
        at okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:92)
        at okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:67)
        at okhttp3.RealCall.getResponseWithInterceptorChain(RealCall.java:185)
        at okhttp3.RealCall.execute(RealCall.java:69)
        at io.fabric8.kubernetes.client.dsl.base.OperationSupport.handleResponse(OperationSupport.java:404)
        at io.fabric8.kubernetes.client.dsl.base.OperationSupport.handleResponse(OperationSupport.java:365)
        at io.fabric8.kubernetes.client.dsl.base.OperationSupport.handleGet(OperationSupport.java:330)
        at io.fabric8.kubernetes.client.dsl.base.OperationSupport.handleGet(OperationSupport.java:311)
        at io.fabric8.kubernetes.client.dsl.base.BaseOperation.handleGet(BaseOperation.java:810)
        at io.fabric8.kubernetes.client.dsl.base.BaseOperation.getMandatory(BaseOperation.java:218)
        ... 20 more

What am I doing wrong?

The helloWorkd script:

from pyspark import SparkContext
from pyspark import SparkConf
from pyspark.sql import SparkSession

if __name__ == '__main__':
    spark = SparkSession\
        .builder\
        .appName("PythonEx")\
        .getOrCreate()
    txt = spark.sparkContext.textFile('hdfs://172.20.234.174:1515/testing/testFile.txt')
    first = txt.first()
    spark.sparkContext.parallelize(first).saveAsTextFile('hdfs://172.20.234.174:9000/testing/result.txt')
    spark.stop()
-- semenchukou
apache-spark
kubernetes
pyspark
python
spark-submit

1 Answer

3/4/2020

Your error stacktrace says:

Caused by: java.net.UnknownHostException: kubernetes.default.svc: Try again

which means that by some reason you do not have Service kubernetes in namespace default or you have DNS related problems in your cluster.

Also this issue has already been discussed with you here.

-- Aliaksandr Sasnouskikh
Source: StackOverflow