I got warnings and errors when I run Structured Streaming on K8s cluster. Here are part of my codes:
val df = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", brokerServers)
.option("subscribe", topicName)
.load()
val query = df.writeStream
.outputMode("append")
.foreachBatch((batchDF: DataFrame, batchId: Long) => {
val rstDF = batchDF.select($"value")
.map(row => valueDeserializer.deserialize(topicName, row.getAs[Array[Byte]]("value"), topicValueAvroSchema).toString)
.transform(runner.spark.read.json)
.transform(trimDF)
println(s"Batch $batchId: ${rstDF.count} rows")
rstDF.show(false)
})
.trigger(Trigger.ProcessingTime("120 seconds"))
.start()
query.awaitTermination()
The first batch (batch 0) is OK. However, when data arrived for batch 1, I got warnings about task lost due to java.lang.NullPointerException.
...
19/10/12 02:02:18 ERROR TaskSetManager: Task 0 in stage 2.0 failed 4 times; aborting job
...
19/10/12 02:02:18 INFO DAGScheduler: ResultStage 2 (start at MergeKafkaToDelta.scala:124) failed in 17.980 s due to Job aborted due to stage failure: Task 0 in stage 2.0 failed 4 times, most recent failure: Lost task 0.3 in stage 2.0 (TID 4, 172.30.170.218, executor 3): java.lang.NullPointerException
...
19/10/12 02:02:18 ERROR MicroBatchExecution: Query [id = e1f15e44-ad17-452d-97cf-def26f729f38, runId = c0b7c2ba-fca4-4538-8095-cbe2daeec525] terminated with error
org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 2.0 failed 4 times, most recent failure: Lost task 0.3 in stage 2.0 (TID 4, 172.30.170.218, executor 3): java.lang.NullPointerException
...
Do you know its root cause? How could I setup configurations and parameters for spark-submit? I got a blog related to it: Spark Streaming Checkpointing on Kubernetes
However, it is based on specific Cloud platform. :-( Do you know general solutions?
Seem like spark-on-k8s-operator is a solution: Spark Structured Streaming Applications on Kubernetes. And there is a new version for spark 2.4.4: spark-on-k8s-operator
So I'm working on it and try to work out an example for Structured Streaming.