2
votes

Context: I've defined a airflow DAG which performs an operation, compute_metrics, on some data for an entity based on a parameter called org. Underneath something like myapi.compute_metrics(org) is called. This flow will mostly be run on an ad-hoc basis.

Problem: I'd like to be able to select the org to run the flow against when I manually trigger the DAG from the airflow UI.

The most straightforward solution I can think of is to generate n different DAGs, one for each org. The DAGs would have ids like: compute_metrics_1, compute_metrics_2, etc... and then when I need to trigger compute metrics for a single org, I can pick the DAG for that org. This doesn't scale as I add orgs and as I add more types of computation.

I've done some research and it seems that I can create a flask blueprint for airflow, which to my understanding, extends the UI. In this extended UI I can add input components, like a text box, for picking an org and then pass that as a conf to a DagRun which is manually created by the blueprint. Is that correct? I'm imaging I could write something like:

session = settings.Session()

execution_date = datetime.now()
run_id = 'external_trigger_' + execution_date.isoformat()

trigger = DagRun(
    dag_id='general_compute_metrics_needs_org_id',
    run_id=run_id,
    state=State.RUNNING,
    execution_date=execution_date,
    external_trigger=True,
    conf=org_ui_component.text) # pass the org id from a component in the blueprint
session.add(trigger)
session.commit() # I don't know if this would actually be scheduled by the scheduler

Is my idea sound? Is there a better way to achieve what I want?

1
Maybe a view is actually the right tool to usePaymahn Moghadasian
can you post the code for the custom view you created to achieve this for others to use as an example?melchoir55

1 Answers

4
votes

I've done some research and it seems that I can create a flask blueprint for airflow, which to my understanding, extends the UI.

The blueprint extends the API. If you want some UI for it, you'll need to serve a template view. The most feature-complete way of achieve this is developing your own Airflow Plugin.

If you want to manually create DagRuns, you can use this trigger as reference. For simplicity, I'd trigger a Dag with the API.

And specifically about your problem, I would have a single DAG compute_metrics that reads the org from an Airflow Variable. They are global and can be set dynamically. You can prefix the variable name with something like the DagRun id to make it unique and thus dag-concurrent safe.