How to write Airflow DAG using KubernetesPodOperator to submit Spark Jobs

5/21/2021

I have stumbled upon so many answers around how to write an Airflow DAG using KubernetesPodOperator to submit Spark Jobs. After struggling for days and finding not a proper syntax, somehow I get it working. Posting an answer here, so that others don't struggle much :)

-- Prateek Dubey
airflow
apache-spark
kubernetes

1 Answer

5/21/2021

Below Airflow DAG uses KubernetesPodOperator to submit a Spark Job wherein it reads the PySpark script from Ceph. Same syntax should work if your PySpark script is in AWS S3. For AWS S3 you don't need to pass fs.s3a.endpoint, fs.s3a.connection.*, f3.s3a.path.*

# Airflow DEMO DAG

from airflow import DAG
from datetime import timedelta, datetime
from kubernetes.client import models as k8s
from airflow.contrib.operators.kubernetes_pod_operator import KubernetesPodOperator

args = {
	"owner": "prateek.dubey",
	"email": ["<your_email_id>"],
	"depends_on_past": False,
	"start_date": datetime(2019,1,1),
    "catchup": False, 
    "email_on_failure": False,
    "email_on_retry": False,
    "retries": 1,
    "retry_delay": timedelta(minutes=5)
}

dag = DAG(dag_id='kubernetes_sample_dag', default_args=args, schedule_interval=None)

ceph_data_read = KubernetesPodOperator(
        namespace='airflow',
        image='spark-executor-3.0.1',
        image_pull_policy='Always',
        image_pull_secrets=[k8s.V1LocalObjectReference('gcr')],
        service_account_name='spark',
        name='prateek-ceph-data-read',
        task_id='ceph_data_read',
        in_cluster=True,
        get_logs=True,
        arguments=[
                '/opt/spark/bin/spark-submit',
                '--master', 'k8s://https://<api_server_host>:6443',
                '--deploy-mode', 'cluster',
                '--name', 'prateek-ceph-data-read',
                '--conf', 'spark.kubernetes.driver.pod.name=prateek-ceph-data-read',
                '--conf', 'spark.kubernetes.executor.podNamePrefix=prateek-ceph-data-read',
                '--conf', 'spark.kubernetes.namespace=airflow',
                '--conf', 'spark.kubernetes.container.image=spark-executor-3.0.1',
                '--conf', 'spark.kubernetes.container.image.pullPolicy=Always',
                '--conf', 'spark.kubernetes.container.image.pullSecrets=gcr',
                '--conf', 'spark.kubernetes.authenticate.driver.serviceAccountName=spark',
                '--conf', 'spark.kubernetes.authenticate.executor.serviceAccountName=spark',
                '--conf', 'spark.kubernetes.authenticate.submission.caCertFile=/var/run/secrets/kubernetes.io/serviceaccount/ca.crt',
                '--conf', 'spark.kubernetes.authenticate.submission.oauthTokenFile=/var/run/secrets/kubernetes.io/serviceaccount/token',
                '--conf', 'spark.hadoop.fs.s3a.aws.credentials.provider=com.amazonaws.auth.EnvironmentVariableCredentialsProvider',
                '--conf', 'spark.hadoop.fs.s3a.endpoint=http://<ceph_endpoint_url>:8080',
                '--conf', 'spark.hadoop.fs.s3a.connection.ssl.enabled=false',
                '--conf', 'spark.hadoop.fs.s3a.path.style.access=true',
                '--conf', 'spark.executor.instances=2',
                '--conf', 'spark.executor.cores=6',
                '--conf', 'spark.executor.memory=55g',
                's3a://<ceph_bucket>/Ceph_PySpark_Read_Data.py'
            ],
        dag=dag
    )

ceph_data_read
-- Prateek Dubey
Source: StackOverflow