I have stumbled upon so many answers around how to write an Airflow DAG using KubernetesPodOperator to submit Spark Jobs. After struggling for days and finding not a proper syntax, somehow I get it working. Posting an answer here, so that others don't struggle much :)
Below Airflow DAG uses KubernetesPodOperator to submit a Spark Job wherein it reads the PySpark script from Ceph. Same syntax should work if your PySpark script is in AWS S3. For AWS S3 you don't need to pass fs.s3a.endpoint
, fs.s3a.connection.*
, f3.s3a.path.*
# Airflow DEMO DAG
from airflow import DAG
from datetime import timedelta, datetime
from kubernetes.client import models as k8s
from airflow.contrib.operators.kubernetes_pod_operator import KubernetesPodOperator
args = {
"owner": "prateek.dubey",
"email": ["<your_email_id>"],
"depends_on_past": False,
"start_date": datetime(2019,1,1),
"catchup": False,
"email_on_failure": False,
"email_on_retry": False,
"retries": 1,
"retry_delay": timedelta(minutes=5)
}
dag = DAG(dag_id='kubernetes_sample_dag', default_args=args, schedule_interval=None)
ceph_data_read = KubernetesPodOperator(
namespace='airflow',
image='spark-executor-3.0.1',
image_pull_policy='Always',
image_pull_secrets=[k8s.V1LocalObjectReference('gcr')],
service_account_name='spark',
name='prateek-ceph-data-read',
task_id='ceph_data_read',
in_cluster=True,
get_logs=True,
arguments=[
'/opt/spark/bin/spark-submit',
'--master', 'k8s://https://<api_server_host>:6443',
'--deploy-mode', 'cluster',
'--name', 'prateek-ceph-data-read',
'--conf', 'spark.kubernetes.driver.pod.name=prateek-ceph-data-read',
'--conf', 'spark.kubernetes.executor.podNamePrefix=prateek-ceph-data-read',
'--conf', 'spark.kubernetes.namespace=airflow',
'--conf', 'spark.kubernetes.container.image=spark-executor-3.0.1',
'--conf', 'spark.kubernetes.container.image.pullPolicy=Always',
'--conf', 'spark.kubernetes.container.image.pullSecrets=gcr',
'--conf', 'spark.kubernetes.authenticate.driver.serviceAccountName=spark',
'--conf', 'spark.kubernetes.authenticate.executor.serviceAccountName=spark',
'--conf', 'spark.kubernetes.authenticate.submission.caCertFile=/var/run/secrets/kubernetes.io/serviceaccount/ca.crt',
'--conf', 'spark.kubernetes.authenticate.submission.oauthTokenFile=/var/run/secrets/kubernetes.io/serviceaccount/token',
'--conf', 'spark.hadoop.fs.s3a.aws.credentials.provider=com.amazonaws.auth.EnvironmentVariableCredentialsProvider',
'--conf', 'spark.hadoop.fs.s3a.endpoint=http://<ceph_endpoint_url>:8080',
'--conf', 'spark.hadoop.fs.s3a.connection.ssl.enabled=false',
'--conf', 'spark.hadoop.fs.s3a.path.style.access=true',
'--conf', 'spark.executor.instances=2',
'--conf', 'spark.executor.cores=6',
'--conf', 'spark.executor.memory=55g',
's3a://<ceph_bucket>/Ceph_PySpark_Read_Data.py'
],
dag=dag
)
ceph_data_read