SparkSubmitOperator stops following Apache Spark job?

2/17/2022

I have an Airflow DAG with SparkSubmitOperator that successfully triggers a pyspark job on an Apache Spark cluster however the job is stuck in a running state for as long as possible until manually killed in the cluster or marked as failed in Airflow. It seems that the job runs into issues with the executors and it continuously tries to grant and remove executors. Both Airflow and Apache Spark are running on Kubernetes.

CoarseGrainedSchedulerBackend$DriverEndpoint: Asked to remove non-existent executor 6
[2022-02-17 21:39:18,708] {spark_submit.py:514} INFO - 22/02/17 21:39:18 INFO StandaloneSchedulerBackend: Granted executor ID app-20220217213913-0068/8 on hostPort <someip:port> with 1 core(s), 1024.0 MiB RAM
[2022-02-17 21:39:18,717] {spark_submit.py:514} INFO - 22/02/17 21:39:18 INFO StandaloneAppClient$ClientEndpoint: Executor updated: app-20220217213913-0068/8 is now RUNNING
[2022-02-17 21:39:18,951] {spark_submit.py:514} INFO - 22/02/17 21:39:18 INFO StandaloneAppClient$ClientEndpoint: Executor updated: app-20220217213913-0068/7 is now EXITED (Command exited with code 1)
[2022-02-17 21:39:18,951] {spark_submit.py:514} INFO - 22/02/17 21:39:18 INFO StandaloneSchedulerBackend: Executor app-20220217213913-0068/7 removed: Command exited with code 1
[2022-02-17 21:39:18,951] {spark_submit.py:514} INFO - 22/02/17 21:39:18 INFO StandaloneAppClient$ClientEndpoint: Executor added: app-20220217213913-0068/9 on worker-20220210201025-<someip:port> (<someip:port>) with 1 core(s)
[2022-02-17 21:39:18,951] {spark_submit.py:514} INFO - 22/02/17 21:39:18 INFO BlockManagerMasterEndpoint: Trying to remove executor 7 from BlockManagerMaster.
[2022-02-17 21:39:18,951] {spark_submit.py:514} INFO - 22/02/17 21:39:18 INFO BlockManagerMaster: Removal of executor 7 requested

The airflow task and spark config associated to that task looks like this:

spark_config = {
    'conn_id': 'spark_cluster',
    'application': '/opt/airflow/dags/repo/pyspark_test.py',
    'num_executors': 1,
    'executor_cores': 1,
    'executor_memory': '1g',
    'driver_memory': '1g',
    'conf': 
        {
            'spark.executorEnv.JAVA_HOME':'/opt/bitnami/java',
            'spark.kubernetes.appMasterEnv.JAVA_HOME':'/opt/bitnami/java/',
            'spark.driver.host': os.environ['HOSTNAME'],
        }
    }

with DAG(
    dag_id='spark-operator',
    default_args=default_args,
    schedule_interval=None,
    start_date=datetime(2021, 1, 1),
    catchup=False
    ) as dag:

    submit_job = SparkSubmitOperator(
        task_id='spark_submit_job',
        **spark_config
    )

    submit_job

So far I followed this post and tried disabling the dynamic allocation in the spark_config but got the same end result. Additionally, I used the BashOperator in Airflow to trigger the same job and that seems to trigger the job successfully and returns when the cluster finishes the job, marking the airflow task as successful.

    airflow_cluster_task = BashOperator(
        task_id='airflow_cluster_task',
        bash_command='/home/airflow/.local/bin/spark-submit --master=spark://<url>:<port> --conf spark.driver.host=$(hostname -i) /opt/airflow/dags/repo/pyspark_test.py',
    )

Does the SparkSubmitOperator in Airflow behave differently? Or am I missing some form of configuration? $HOSTNAME is set to the pod name of whatever pod the task is running on.

-- anm
airflow
apache-spark
kubernetes
pyspark
python

0 Answers