Context: I've defined a airflow DAG which performs an operation, compute_metrics
, on some data for an entity based on a parameter called org
. Underneath something like myapi.compute_metrics(org)
is called. This flow will mostly be run on an ad-hoc basis.
Problem: I'd like to be able to select the org
to run the flow against when I manually trigger the DAG from the airflow UI.
The most straightforward solution I can think of is to generate n
different DAGs, one for each org. The DAGs would have id
s like: compute_metrics_1
, compute_metrics_2
, etc... and then when I need to trigger compute metrics for a single org
, I can pick the DAG for that org. This doesn't scale as I add orgs and as I add more types of computation.
I've done some research and it seems that I can create a flask blueprint for airflow, which to my understanding, extends the UI. In this extended UI I can add input components, like a text box, for picking an org and then pass that as a conf
to a DagRun
which is manually created by the blueprint. Is that correct? I'm imaging I could write something like:
session = settings.Session() execution_date = datetime.now() run_id = 'external_trigger_' + execution_date.isoformat() trigger = DagRun( dag_id='general_compute_metrics_needs_org_id', run_id=run_id, state=State.RUNNING, execution_date=execution_date, external_trigger=True, conf=org_ui_component.text) # pass the org id from a component in the blueprint session.add(trigger) session.commit() # I don't know if this would actually be scheduled by the scheduler
Is my idea sound? Is there a better way to achieve what I want?