Spark streaming on kubernetes using kafka

9/28/2021

I am getting below errors in spark-streaming application, i am using kafka for input stream dstream. When using client mode in spark submit I was getting below error:

    21/09/22 17:50:30 INFO storage.BlockManagerInfo: Added broadcast_0_piece0 in memory
Exception in thread "streaming-job-executor-0" java.lang.Error: java.lang.InterruptedException
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1155)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.InterruptedException
	at java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:998)
	at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1304)
	at scala.concurrent.impl.Promise$DefaultPromise.tryAwait(Promise.scala:242)
	at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:258)

But when I was running it on cluster mode it was throwing following issue:

Caused by: java.lang.SecurityException: java.io.IOException: ./kafka_client_k2h.jaas (No such file or directory)
at sun.security.provider.ConfigFile$Spi.<init>(ConfigFile.java:137)
at sun.security.provider.ConfigFile.<init>(ConfigFile.java:102)
at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
at java.lang.Class.newInstance(Class.java:442)

Code i am using :

    instream.foreachRDD(rdd => {
          if(!rdd.isEmpty){
          val rdd2=rdd.map(record=>record.key().toString)
          println(rdd2.collect)
     }
     else{
          println("records not found")
     }
    })

Spark Submit Command I used:

    $SPARK_HOME/bin/spark-submit \
--verbose \
--master $MASTER \
--deploy-mode client \
--name ukafka-test2 \
--class com.test.Controller \
--conf spark.kubernetes.container.image=$IMAGE \
--conf spark.kubernetes.driver.podTemplateFile=/home/test/spark-driver-pod-template.yaml \
--conf spark.kubernetes.executor.podTemplateFile=/home/test/spark-executor-pod-template.yaml \
--conf spark.kubernetes.driver.limit.cores=1 \
--conf spark.executor.instances=1 \
--conf spark.kubernetes.authenticate.driver.serviceAccountName=spark \
--conf spark.kubernetes.namespace=cn-paas-dev \
--conf spark.kubernetes.file.upload.path=hdfs://testEnv/tmp \
--conf spark.eventLog.enabled=true \
--conf spark.eventLog.dir=hdfs://testEnv:8020/scaas/shs_logs/ \
--conf spark.driver.extraJavaOptions="-Djava.security.auth.login.config=kafka_client.jaas" \
--conf spark.executor.extraJavaOptions="-Djava.security.auth.login.config=kafka_client.jaas" \
--files file:///home/test/kafka_client.jaas,file:///home/test/kafka.keytab,file:///home/test/sapd115_public.jks \
/home/poc-1.0-jar-with-dependencies.jar "transactions" "globals" "earliest" "ab_consumer" "agb_con_grp3" "sapd115_public.jks"

Any idea why is it throwing the following issues

-- Sahil Arora
apache-kafka
apache-spark
kubernetes
spark-streaming

0 Answers