I am trying to write the results of a table intersection to a csv file via airflow (which is installed inside Kubernetes), but I get the message that the csv file is a read file. Is there any parameter I can modify in the script that makes it write the result?
def update_table():
# Connecting to BigQuery
client = bigquery.Client()
query = """SELECT ltrim(rtrim(col1)) as col1,
sum(col2) as col2
FROM dataset.table
GROUP BY 1
ORDER BY col1 desc """
job = client.query(query)
df_tst = job.to_dataframe()
# Connecting to BigQuery
query_mer = """SELECT distinct col1 FROM dataset.table2 """
mer_job = client.query(query_mer)
df_mer = mer_job.to_dataframe()
# Comparing both tables
nomes = df_tst.col1.tolist()
#categorizacao_merchants
nomes_mer = df_merchants.col1.tolist()
lista = list(np.setdiff1d(nomes, nomes_mer))
for x in lista:
with open('/airflow/dags/git/test.csv','a', newline='') as f:
writer = csv.writer(f, delimiter=';')
writer.writerow([x])
f.close()
with DAG('update_cat', default_args=default_args, description='Python DAG', schedule_interval='0 0 * * 0', start_date=airflow.utils.dates.days_ago(0), catchup=False) as dag:
python_task = PythonOperator(task_id='python_task', python_callable=update_table, dag=dag)
It is very un-airflow-esque to write local files during a task. In general tasks should read from sources and write back to locations that are independent of the container/task itself, and multiple executions of the same task should not further alter the source or destination data. This is to preserve the principles of idempotency: the same task running with the same input should always generate the same output, and airflow doesn't support directly passing data from one task to another, so you need some intermediary data storage.