We are running Spark 3.1.1 on Kubernetes in client mode.
We are a simple scala spark application that loads parquet files from S3 and aggregates them:
sparkSession.read.parquet(paths).as[MyRawEvent]
Our application runs perfectly on the happy-path: driver pod starts running, executors pods join the party, and when the application finishes both executors and driver terminate.
On the other hand, if something goes wrong, both driver + executors pods stay on Running
state. For instance, an exception occurs (in driver) if one of the paths
above doesn't exist:
Exception in thread "main" org.apache.spark.sql.AnalysisException: Path does not exist: s3a://<bucket-name>/client-id=8765432/date=2021-08-06
at org.apache.spark.sql.execution.datasources.DataSource$.$anonfun$checkAndGlobPathIfNecessary$4(DataSource.scala:803)
at org.apache.spark.sql.execution.datasources.DataSource$.$anonfun$checkAndGlobPathIfNecessary$4$adapted(DataSource.scala:800)
at org.apache.spark.util.ThreadUtils$.$anonfun$parmap$2(ThreadUtils.scala:372)
at scala.concurrent.Future$.$anonfun$apply$1(Future.scala:659)
at scala.util.Success.$anonfun$map$1(Try.scala:255)
at scala.util.Success.map(Try.scala:213)
at scala.concurrent.Future.$anonfun$map$1(Future.scala:292)
at scala.concurrent.impl.Promise.liftedTree1$1(Promise.scala:33)
at scala.concurrent.impl.Promise.$anonfun$transform$1(Promise.scala:33)
at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:64)
at java.base/java.util.concurrent.ForkJoinTask$RunnableExecuteAction.exec(Unknown Source)
at java.base/java.util.concurrent.ForkJoinTask.doExec(Unknown Source)
at java.base/java.util.concurrent.ForkJoinPool$WorkQueue.topLevelExec(Unknown Source)
at java.base/java.util.concurrent.ForkJoinPool.scan(Unknown Source)
at java.base/java.util.concurrent.ForkJoinPool.runWorker(Unknown Source)
at java.base/java.util.concurrent.ForkJoinWorkerThread.run(Unknown Source)
Interestingly, this exception doesn't prevent the executors to start right after and both driver&executors pods are stuck forever, doing nothing.
We are not catching exceptions in our application and we expected that the driver and the executors will stop, instead of wasting redundant resources.
How can we crush the application so it won't stay in Running
state forever?
Well, that's was an easy one.
I had to catch all exceptions to ensure that spark context is being closed no matter what:
def main(args: Array[String]): Unit = {
// some code
implicit val sparkSession = SparkSession.builder().getOrCreate
try {
// application code with potential exceptions
} catch {
case exception: Exception =>
sparkSession.close()
throw exception
}
sparkSession.close()
}
That way all resources are freed and the driver pod changes its state to Error
as excepted.
EDIT - on in the Scala fashion:
def main(args: Array[String]): Unit = {
// some code
implicit val sparkSession = SparkSession.builder().getOrCreate
Try {
// application code with potential exceptions
} match {
case Success(_) => None
case Failure(exception) =>
sparkSession.close()
throw exception
}
sparkSession.close()
}