Spark driver doesn't crash on exception

8/7/2021

We are running Spark 3.1.1 on Kubernetes in client mode.

We are a simple scala spark application that loads parquet files from S3 and aggregates them:

sparkSession.read.parquet(paths).as[MyRawEvent]

Our application runs perfectly on the happy-path: driver pod starts running, executors pods join the party, and when the application finishes both executors and driver terminate.

On the other hand, if something goes wrong, both driver + executors pods stay on Running state. For instance, an exception occurs (in driver) if one of the paths above doesn't exist:

Exception in thread "main" org.apache.spark.sql.AnalysisException: Path does not exist: s3a://<bucket-name>/client-id=8765432/date=2021-08-06
     at org.apache.spark.sql.execution.datasources.DataSource$.$anonfun$checkAndGlobPathIfNecessary$4(DataSource.scala:803)
     at org.apache.spark.sql.execution.datasources.DataSource$.$anonfun$checkAndGlobPathIfNecessary$4$adapted(DataSource.scala:800)
     at org.apache.spark.util.ThreadUtils$.$anonfun$parmap$2(ThreadUtils.scala:372)
     at scala.concurrent.Future$.$anonfun$apply$1(Future.scala:659)
     at scala.util.Success.$anonfun$map$1(Try.scala:255)
     at scala.util.Success.map(Try.scala:213)
     at scala.concurrent.Future.$anonfun$map$1(Future.scala:292)
     at scala.concurrent.impl.Promise.liftedTree1$1(Promise.scala:33)
     at scala.concurrent.impl.Promise.$anonfun$transform$1(Promise.scala:33)
     at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:64)
     at java.base/java.util.concurrent.ForkJoinTask$RunnableExecuteAction.exec(Unknown Source)
     at java.base/java.util.concurrent.ForkJoinTask.doExec(Unknown Source)
     at java.base/java.util.concurrent.ForkJoinPool$WorkQueue.topLevelExec(Unknown Source)
     at java.base/java.util.concurrent.ForkJoinPool.scan(Unknown Source)
     at java.base/java.util.concurrent.ForkJoinPool.runWorker(Unknown Source)
     at java.base/java.util.concurrent.ForkJoinWorkerThread.run(Unknown Source)

Interestingly, this exception doesn't prevent the executors to start right after and both driver&executors pods are stuck forever, doing nothing.

We are not catching exceptions in our application and we expected that the driver and the executors will stop, instead of wasting redundant resources.

How can we crush the application so it won't stay in Running state forever?

-- ItayB
amazon-s3
apache-spark
kubernetes
parquet
scala

1 Answer

8/8/2021

Well, that's was an easy one.

I had to catch all exceptions to ensure that spark context is being closed no matter what:

  def main(args: Array[String]): Unit = {
    // some code
    implicit val sparkSession = SparkSession.builder().getOrCreate
    try {
      // application code with potential exceptions
    } catch {
      case exception: Exception =>
        sparkSession.close()
        throw exception
    }

    sparkSession.close()
  }

That way all resources are freed and the driver pod changes its state to Error as excepted.

EDIT - on in the Scala fashion:

  def main(args: Array[String]): Unit = {
    // some code
    implicit val sparkSession = SparkSession.builder().getOrCreate
    Try {
      // application code with potential exceptions
    } match {
      case Success(_) => None
      case Failure(exception) =>
        sparkSession.close()
        throw exception
    }

    sparkSession.close()
  }
-- ItayB
Source: StackOverflow