0
votes

Problem

Airflow tasks of the type DataflowTemplateOperator take a long time to complete. This means other tasks can be blocked by it (correct?).

When we run more of these tasks, that means we would need a bigger Cloud Composer cluster (in our case) to execute tasks that are essentially blocking while they shouldn't be (they should be async operations).

Options

  • Option 1: just launch the job and airflow job is successful
  • Option 2: write a wrapper as explained here and use a reschedule mode as explained here

Option 1 does not seem feasible as the DataflowTemplateOperator only has an option to specify the wait time between completion checks called poll_sleep (source).

For the DataflowCreateJavaJobOperator there is an option check_if_running to wait for completion of a previous job with the same name (see this code)

It seems that after launching a job, the wait_for_finish is executed (see this line), which boils down to an "incomplete" job (see this line).

For Option 2, I need Option 1.

Questions

  1. Am I correct to assume that Dataflow tasks will block others in Cloud Composer/Airflow?
  2. Is there a way to schedule a job without a "wait to finish" using the built-in operators? (I might have overlooked something)
  3. Is there an easy way to write this myself? I'm thinking of just executing a bash launch script, followed by a task that looks if the job finished correctly, but in a reschedule mode.
  4. Is there another way to avoid blocking other tasks while running dataflow jobs? Basically this is an async operation and should not take resources.
1

1 Answers

1
votes

Answers

  1. Am I correct to assume that Dataflow tasks will block others in Cloud Composer/Airflow?
    A: Partly yes. Airflow has parallelism option in the configuration which define the number of tasks that should execute at a time across the system. Having a task block this slot might slow down the execution in the system but this issue is bound to happen as you increase the number of tasks and DAGs. You can increase this in the configuration depending on your needs

  1. Is there a way to schedule a job without a "wait to finish" using the built-in operators? (I might have overlooked something)
    A: Yes. You can use PythonOperator and in the python_callable you can use the dataflow hook to launch the job in async mode (launch and don't wait).

  1. Is there an easy way to write this myself? I'm thinking of just executing a bash launch script, followed by a task that looks if the job finished correctly, but in a reschedule mode. A: When you say reschedule, I'm assuming that you are going to retry the task that looks for job that checks if the job finished correctly. If I'm right, you can set the task on retry mode and the delay at which you want the retry to happen.

  1. Is there another way to avoid blocking other tasks while running dataflow jobs? Basically this is an async operation and should not take resources.
    A: I think I answered this in the second question.