Airflow with Kubernetes Executor - Error: Read-only file system: '/airflow/dags/git/test.csv

10/15/2019

I am trying to write the results of a table intersection to a csv file via airflow (which is installed inside Kubernetes), but I get the message that the csv file is a read file. Is there any parameter I can modify in the script that makes it write the result?

def update_table():
    # Connecting to BigQuery
    client = bigquery.Client()
    query = """SELECT ltrim(rtrim(col1)) as col1,
                 sum(col2) as col2
                 FROM dataset.table
                 GROUP BY 1
                 ORDER BY col1 desc """
    job = client.query(query)
    df_tst = job.to_dataframe()

    # Connecting to BigQuery 
    query_mer = """SELECT distinct col1 FROM dataset.table2 """
    mer_job = client.query(query_mer)
    df_mer = mer_job.to_dataframe()

    # Comparing both tables

    nomes = df_tst.col1.tolist()
    #categorizacao_merchants
    nomes_mer = df_merchants.col1.tolist()
    lista = list(np.setdiff1d(nomes, nomes_mer))

    for x in lista:
        with open('/airflow/dags/git/test.csv','a', newline='') as f:
            writer = csv.writer(f, delimiter=';')
            writer.writerow([x])
            f.close()

with DAG('update_cat', default_args=default_args, description='Python DAG', schedule_interval='0 0 * * 0', start_date=airflow.utils.dates.days_ago(0), catchup=False) as dag:
        python_task = PythonOperator(task_id='python_task', python_callable=update_table, dag=dag)
-- Felipe FB
airflow
kubernetes
python

1 Answer

1/24/2020

It is very un-airflow-esque to write local files during a task. In general tasks should read from sources and write back to locations that are independent of the container/task itself, and multiple executions of the same task should not further alter the source or destination data. This is to preserve the principles of idempotency: the same task running with the same input should always generate the same output, and airflow doesn't support directly passing data from one task to another, so you need some intermediary data storage.

-- Benoit Rainville
Source: StackOverflow