I'm trying to submit a spark 2.3 job on kubernetes cluster in scala using the play framework.
I have also tried as a simple scala program without using play framework.
The job is getting submitted to k8 cluster but stateChanged & infoChanged are not getting invoked. I also want to be able to get the handle.getAppId.
I'm using spark submit to submit the job, as described here
$ bin/spark-submit \
--master k8s://https://<k8s-apiserver-host>:<k8s-apiserver-port> \
--deploy-mode cluster \
--name spark-pi \
--class org.apache.spark.examples.SparkPi \
--conf spark.executor.instances=5 \
--conf spark.kubernetes.container.image=<spark-image> \
local:///path/to/examples.jar
Here is the code for the job:
def index = Action {
try {
val spark = new SparkLauncher()
.setMaster("my k8 apiserver host")
.setVerbose(true)
.addSparkArg("--verbose")
.setMainClass("myClass")
.setAppResource("hdfs://server/inputs/my.jar")
.setConf("spark.app.name","myapp")
.setConf("spark.executor.instances","5")
.setConf("spark.kubernetes.container.image","mydockerimage")
.setDeployMode("cluster")
.startApplication(new SparkAppHandle.Listener(){
def infoChanged(handle: SparkAppHandle): Unit = {
System.out.println("Spark App Id ["
+ handle.getAppId
+ "] Info Changed. State ["
+ handle.getState + "]")
}
def stateChanged(handle: SparkAppHandle): Unit = {
System.out.println("Spark App Id ["
+ handle.getAppId
+ "] State Changed. State ["
+ handle.getState + "]")
if (handle.getState.toString == "FINISHED") System.exit(0)
}
} )
Ok(spark.getState().toString())
} catch {
case NonFatal(e)=>{
println("failed with exception: " + e)
}
}
Ok
}
SparkLauncher
allows to programmatically run spark-submit
command. It runs as a separate child thread in the JVM. You need to wait in your client main function until driver get launched in K8s and you get listener callbacks. Otherwise, JVM main threads exist killing the client and not reporting anything.
----------------------- -----------------------
| User App | spark-submit | Spark App |
| | -------------------> | |
| ------------| |------------- |
| | | hello | | |
| | L. Server |<----------------------| L. Backend | |
| | | | | |
| ------------- -----------------------
| | | ^
| v | |
| -------------| |
| | | <per-app channel> |
| | App Handle |<------------------------------
| | |
-----------------------
I have added a j.u.c.CountDownLatch
implementation that prevents main thread exiting until appState.isFinal
is reached.
object SparkLauncher {
def main(args: Array[String]) {
import java.util.concurrent.CountDownLatch
val countDownLatch = new CountDownLatch(1)
val launcher = new SparkLauncher()
.setMaster("k8s://http://127.0.0.1:8001")
.setAppResource("local:/{PATH}/spark-examples_2.11-2.3.0.jar")
.setConf("spark.app.name","spark-pi")
.setMainClass("org.apache.spark.examples.SparkPi")
.setConf("spark.executor.instances","5")
.setConf("spark.kubernetes.container.image","spark:spark-docker")
.setConf("spark.kubernetes.driver.pod.name","spark-pi-driver")
.setDeployMode("cluster")
.startApplication(new SparkAppHandle.Listener() {
def infoChanged(handle: SparkAppHandle): Unit = {
}
def stateChanged(handle: SparkAppHandle): Unit = {
val appState = handle.getState()
println(s"Spark App Id [${handle.getAppId}] State Changed. State [${handle.getState}]")
if (appState != null && appState.isFinal) {
countDownLatch.countDown //waiting until spark driver exits
}
}
})
countDownLatch.await()
}
}