0
votes

Context

I'm trying to build an ingestion pipeline on Google Cloud Platform using Composer, DataProc and BigQuery. I have a table in BigQuery which contains records of data source and its relevant file. So if there are 5 files which I need to ingest, I have 5 records in the BigQuery table. Tomorrow it could be a different number of files. Hence, I thought about building tasks dynamically within my DAG.

The high level design is as follows:

  • Execute a function to fetch data from BigQuery as a Pandas dataframe (or dict, either is fine)
  • Iterate over the dataframe
  • For each row in the dataframe, create a DataProcSparkOperator with details about the file and corresponding args

This setup runs fine. I can see my DAG and all dynamically generated tasks in Airflow UI.

Edit: Just adding few more details. The BigQuery table will have less than 25 records so querying the table is not concern. Querying the table every 30 seconds is. Secondly, I only need this DAG to run once every 4 hours or so. I do not intend to keep my composer running for that time. All I need every 4 hours is to boot Composer, run the DAG once to process all available files and then shut down.

The problem

While these DataProc tasks are executing, after about a couple of minutes Airflow refreshes the DAG and runs the same set of tasks again. In DataProc Jobs console, I see 2 (sometimes 3) instances of the same task in running state. This is undesirable.

What I have tried

I have set retries=0 at the task level and on the DAG I have set catchup=False, max_active_runs=1, and schedule_interval='@once'. Default arguments for the DAG also have retries=0.

I think the issue is because the part where I am pulling records from BigQuery is part of an ordinary function, rather than being a task in itself. The reason I have not put that in a task is because I couldn't find a solution to pass fetched result from BigQuery into subsequent tasks where I have to loop over them.

I tried calling a PythonOperator and executing Variable.set("df", df) inside it in the hope that I can loop over Variable.get("df") but that didn't work out either.

Sharing relevant code below.

def fetch_pending_files_from_bq():
    # fetch records from BigQuery and return as dataframe

default_args = {
    'start_date': yesterday,
    'default_timezone': 'utc',
    'retries': 0
}

dag = DAG(
    dagid,
    default_args=default_args,
    catchup=False,
    max_active_runs=1,
    description='DAG to ingest data',
    schedule_interval='@once'
)

start_dag = DummyOperator(task_id="start_dag", dag=dag)
end_dag = DummyOperator(task_id="end_dag", dag=dag)

pending_files_df = fetch_pending_files_from_bq()

for index, row in pending_files_df.iterrows():
    task = DataProcSparkOperator(
        dag=dag,
        task_id=row["file_name"],
        arguments=dataproc_args,
        region="us-east1",
        job_name="job_{}".format(task_id),
        dataproc_spark_jars=dataproc_jars,
        ....
        ....
    )

    task.set_upstream(start_dag)
    task.set_downstream(end_dag)

I get the orchestration that I want, the only issue is my DataProc jobs getting re-run automatically.

Any ideas are appreciated.

1
Airflow tasks should be as static as possible (slowly changing is OK). Creating tasks from db table which updates constantly isn't a good idea. it will also create a havey loading on your db connection. Specifically for BigQuery this may means you also pay a lot of money. The fetch_pending_files_from_bq function is executed every 30 seconds (default of min_file_process_interval ) so you are querying BigQuery every 30 seconds. Depends on the scale of data that you scan this may resulted in a high bill. - Elad
@Elad in terms of BigQuery this table will have less than 25 records so the cost of queries wouldn't be too much. Also I must add that I will probably execute this DAG once every 3-4 hours only. It's correct though that I don't want to query BQ every 30 secs. What other option do I have? Even if I query BQ and write results to disk and then loop over that file, won't that writing process be triggered every 30 secs too? - Rajat
probably you will need to have another ETL exporting BQ to storage. In any case this line is strange job_name="job_{}".format(task_id) task_id is not a defined parameter. Are you sure thats your real code? Please also provide images/logs that shows the problem you described of "unrecognized" tasks appearing. - Elad
@Elad - this is my code, but it's not the full code since I cannot post it publicly. task_id is indeed a valid parameter, and the code works fine. Let me try to grab some screenshots when I run it today. - Rajat
While diving deeper into the design, I realized that fetch_pending_files_from_bq is not a task so it gets executed each time the dag is refreshed. That led to multiple queries, and also caused the unexpected creation of duplicate tasks. Hence I dropped this design. I was able to fix this using subdags. The first subdag reads from BQ and writes to GCS. The second subdag reads the file from GCS and creates tasks dynamically. - Rajat

1 Answers

0
votes

While diving deeper into the design, I realized that fetch_pending_files_from_bq is not a task so it gets executed each time the dag is refreshed. That led to multiple queries, and also caused the unexpected creation of duplicate tasks. Hence I dropped this design.

I was able to fix this using subdags. The first subdag reads from BQ and writes to GCS. The second subdag reads the file from GCS and creates tasks dynamically.