I am getting below errors in spark-streaming application, i am using kafka for input stream dstream. When using client mode in spark submit I was getting below error:
21/09/22 17:50:30 INFO storage.BlockManagerInfo: Added broadcast_0_piece0 in memory
Exception in thread "streaming-job-executor-0" java.lang.Error: java.lang.InterruptedException
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1155)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.InterruptedException
at java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:998)
at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1304)
at scala.concurrent.impl.Promise$DefaultPromise.tryAwait(Promise.scala:242)
at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:258)
But when I was running it on cluster mode it was throwing following issue:
Caused by: java.lang.SecurityException: java.io.IOException: ./kafka_client_k2h.jaas (No such file or directory)
at sun.security.provider.ConfigFile$Spi.<init>(ConfigFile.java:137)
at sun.security.provider.ConfigFile.<init>(ConfigFile.java:102)
at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
at java.lang.Class.newInstance(Class.java:442)
Code i am using :
instream.foreachRDD(rdd => {
if(!rdd.isEmpty){
val rdd2=rdd.map(record=>record.key().toString)
println(rdd2.collect)
}
else{
println("records not found")
}
})
Spark Submit Command I used:
$SPARK_HOME/bin/spark-submit \
--verbose \
--master $MASTER \
--deploy-mode client \
--name ukafka-test2 \
--class com.test.Controller \
--conf spark.kubernetes.container.image=$IMAGE \
--conf spark.kubernetes.driver.podTemplateFile=/home/test/spark-driver-pod-template.yaml \
--conf spark.kubernetes.executor.podTemplateFile=/home/test/spark-executor-pod-template.yaml \
--conf spark.kubernetes.driver.limit.cores=1 \
--conf spark.executor.instances=1 \
--conf spark.kubernetes.authenticate.driver.serviceAccountName=spark \
--conf spark.kubernetes.namespace=cn-paas-dev \
--conf spark.kubernetes.file.upload.path=hdfs://testEnv/tmp \
--conf spark.eventLog.enabled=true \
--conf spark.eventLog.dir=hdfs://testEnv:8020/scaas/shs_logs/ \
--conf spark.driver.extraJavaOptions="-Djava.security.auth.login.config=kafka_client.jaas" \
--conf spark.executor.extraJavaOptions="-Djava.security.auth.login.config=kafka_client.jaas" \
--files file:///home/test/kafka_client.jaas,file:///home/test/kafka.keytab,file:///home/test/sapd115_public.jks \
/home/poc-1.0-jar-with-dependencies.jar "transactions" "globals" "earliest" "ab_consumer" "agb_con_grp3" "sapd115_public.jks"
Any idea why is it throwing the following issues