1
votes

I'm attempting to use a Cluster Policy in Airflow 1.9. I followed the instructions in the official documentation, but it doesn't seem to be taking effect.

In my file at $AIRFLOW_HOME/config/airflow_local_settings.py, I've defined the method as the docs instructed and it has the following signature:

def policy(task_instance):

Additional concerns:

  • What Airflow component is actually running the policy code (is it the scheduler)?
  • Is there a recommended way to unit test cluster policy code? If not, what about local testing?

Can anyone help me understand why this Cluster Policy isn't taking effect?

I'm using Airflow 1.9.

1
Is airflow_local_settings.py in your PYTHONPATH?chris.mclennon
Hmm, when I check via (os.environ['PYTHONPATH']), looks like I don't even have PYTHONPATH set. Should I be setting that explicitly.Kasa
Yup that would explain why. You need to have airflow_local_settings.py present in your PYTHONPATH for it to get picked up. You can set it globally on your Airflow instance or add that env var when invoking your airflow commands (e.g. PYTHONPATH=$PYTHONPATH:/path/to/dir/ airflow scheduler).chris.mclennon

1 Answers

0
votes

So you seem to have the file in the right place according to the documents: https://github.com/apache/airflow/blob/master/docs/concepts.rst#where-to-put-airflow_local_settingspy

And your signature is right: https://airflow.apache.org/docs/stable/concepts.html#mutate-tasks-after-dag-loaded

But you haven't shown what you did and how that "did not work".

I believe the def policy(task): signature is run on the scheduler after DAG parsing (as the docs seem to say) while the def task_instance_mutation_hook(ti): signature is run by the task executor on the worker. That's probably why you're not seeing some changes take.

EG timeout or queue is something the scheduler enforces, but connection ID is something the worker needs to know during execution.

So if what you wanted to work was a timeout policy, it should have, but if what you wanted to work was a connection ID enforcement, that wouldn't have.