1
votes

I have a DAG with one DataflowTemplateOperator that can deal with different json files. When I trigger the dag I pass some parameters via {{dag_run.conf['param1']}} and works fine.

The issue I have is trying to rename the task_id based on param1.

i.e. task_id="df_operator_read_object_json_file_{{dag_run.conf['param1']}}",

it complains about only alphanumeric characters or

task_id="df_operator_read_object_json_file_{}".format(dag_run.conf['param1']), it does not recognise dag_run plus the alpha issue.

The whole idea behind this is that when I see at the dataflow jobs console and job has failed I know who the offender is based on param1. Dataflow Job names are based on task_id like this:

df-operator-read-object-json-file-8b9eecec

and what I need is this:

df-operator-read-object-param1-json-file-8b9eecec

Any ideas if this is possible?

1
you mean, for every run, your DAG will have different task_id based on your json file is it ? - Anand Vidvat
param1 can only take 5 diff values the task id should be a string + param1 - Altons
Is this a scheduled dag or only manual with manual input for each run? - Elad
as soon as file lands in bucket the DAG it is triggered by a Cloud Function which passes all parameters required to run the DF job - Altons
This means that if you invoked the function 1000 times it will create 1000 tasks that each one of them ran one time and will never run again? This is not a good practice for Airflow. DataflowTemplatedJobStartOperator (the updated operator) has job_name param which you can use to configure as you like. You don't need to bind the task_id to the job_name - Elad

1 Answers

2
votes

There is no need to generate new operator per file. DataflowTemplatedJobStartOperator has job_name parameter which is also templated so can be used with Jinja.

I didn't test it but this should work:

from airflow.providers.google.cloud.operators.dataflow import DataflowTemplatedJobStartOperator
op = DataflowTemplatedJobStartOperator(
        task_id="df_operator_read_object_json_file",
        job_name= "df_operator_read_object_json_file_{{dag_run.conf['param1']}}"
        template='gs://dataflow-templates/your_template',
        location='europe-west3',
    )