0
votes

We need to pause process flow, until the client confirmed he is OK with the data, then continue the flow.

How we are doing it: Prepare email with pdf attached, send to the client to validation, if he agrees pdf is properly done, then the client would resume flow to run.

The way we performing it is by pausing the flow, and trying to restart is by having two parallel tasks “send_validation_email_pdf”, and “user_validation” respectively. We set the task “user_validation” to fail “in purpose”. At the same time, task “send_validation_email_pdf” is sending the pdf docs with a link that allows the client to set task “user_validation” status “mark as success”.

Hyperlink example:

http://localhost:8080/admin/airflow/success?task_id=user_validation&dag_id=rf.duree&upstream=false&downstream=false&future=false&past=false&execution_date=2019-05-24T00%3A00%3A00%2B00%3A00&origin=http%3A%2F%2Flocalhost%3A8080%2Fadmin%2Fairflow%2Ftree%3Fdag_id%3Drf.duree&confirmed=true

We would like this task to be set to success and resume the entire flow. However, only by marking this task as “success” is not enough. The reason is that the next task still remains with the same state = “upstream_failed” and will not re-run.

I tried to include in the next task called “fin_send_email_validation” the following parameter: trigger_rule=TriggerRule.ALL_SUCCESS – but it did not work. Also tried to use parameter in the “fin_send_email_validation” the following parameter “depends_on_past=True”. Also did not work.

Would someone have a better idea on how to pause process flow, until the client confirmed he is OK with the data, then continue the flow? Or someone could give me an idea on how to unblock what I am already trying to do.

from airflow.operators.python_operator import PythonOperator
from airflow.operators.dummy_operator import DummyOperator
from airflow.utils.trigger_rule import TriggerRule

send_validation_email_pdf = PythonOperator(
    task_id="send_validation_email_pdf",
    provide_context=True,
    python_callable=set_send_validation_email_pdf,
    dag=dag,
)

user_validation = PythonOperator(
    task_id="user_validation",
    retries=0,
    email_on_failure=False,
    python_callable=user_validation,
    dag=dag,
)

fin_send_email_validation = DummyOperator(task_id="fin_send_email_validation", trigger_rule=TriggerRule.ALL_SUCCESS,
                                          depends_on_past=True, dag=dag)

fin_refresh_TDE >> send_validation_email_pdf >> fin_send_email_validation
fin_refresh_TDE >> user_validation >> fin_send_email_validation```


Pause process flow, until the client confirmed he is OK with the data, then continue the flow.
1

1 Answers

0
votes

Have a task that just sleeps, with an appropriate soft_fail timeout delay. When satisfied just mark it as success and the rest of the workflow should continue on.

def user_validation():
    time.sleep(86350)


user_validation = PythonOperator(
        task_id="user_validation",
        retries=0,
        email_on_failure=False,
        soft_fail=True,
        timeout=86400,
        python_callable=user_validation,
        dag=dag,
    )