4
votes

I'm not an expert python developer that's why I'll just outline my steps.

SETUP PART

I created a directory ~/Desktop/airflow for apache-airflow and made

export AIRFLOW_HOME=~/Desktop/airflow

then I created venv using

python3 -m venv ~/Desktop/airflow

the result is

enter image description here

then I made

source bin/activate

pip3 install apache-airflow==1.10.9

airflow initdb

the result is

enter image description here

In my airflow.cfg file I checked dags and plugins directories. I created dags and plugins directories inside $AIRFLOW_HOME/Desktop/airflow

I started airflow webserver and scheduler and made sure that everything works fine.

CUSTOM PLUGIN PART

I found a lot of ways how we can create airflow plugins. I tried all the possible ways. Let's start.

First one is to create a plugin folder inside (first_plugin) a project and then create a python file (first_operator.py)

import logging

from airflow.operators import BaseOperator
from airflow.utils.decorators import apply_defaults
from airflow.plugins_manager import AirflowPlugin

log = logging.getLogger(__name__)


class FirstOperator(BaseOperator):

    @apply_defaults
    def __init__(self, *args, **kwargs):
        super(FirstOperator, self).__init__(*args, **kwargs)

    def execute(self, context):
        log.info("Hello World!")


class FirstOperatorPlugin(AirflowPlugin):
    name = "first_plugin"
    operators = [FirstOperator]

it looks like

enter image description here

then I just move my plugin folder (first_plugin) to $AIRFLOW_HOME/DESKTOP/airflow/plugins and restart airflow webserver and scheduler.

Now time to create a custom dag with using my custom operator. How to import your plugin correct is a challenge. There are a lot of possible ways to import custom operators. I'll show what I tried.

  1. from airflow.operators import FirstOperator - deprecated
  2. from airflow.operators.first_plugin import FirstOperator
  3. from airflow.operators.first_operator import FirstOperator
  4. from first_plugin.first_operator import FirstOperator

None of these ways helped me during importing in Pycharm IDE. For example,

from airflow.operators.first_plugin import FirstOperator

enter image description here

But I'm sure if I ignore the import line and put my custom dag into dags folder it'll work fine. (I tried). Moreover, I decided to check airflow logs (in DEBUG mode).

What logs I see when I restart airflow webserver

enter image description here

I spent 2 days and I still have no any solution. Probably you guys tell me to try other ways. I tried.

https://www.astronomer.io/guides/airflow-importing-custom-hooks-operators/https://www.astronomer.io/guides/airflow-importing-custom-hooks-operators/

https://pybit.es/introduction-airflow.html

All of them are working ways but none of them solves my IDE importing problem.

2

2 Answers

0
votes

I have bypassed the plugins step and did it in a pure python-ish way. It seems that is also what docs say for the latest version (1.10.12).

This is my folder structure:

AIRFLOW_HOME
├── dags
│   ├── __init__.py
│   └── my_dag.py
└── utils
    ├── __init__.py
    └── custom_operator.py

Make sure that PYTHONPATH contains AIRFLOW_HOME path. And then you can import it like this:

from custom_operator.hello_operator import HelloOperator

Read more here: https://airflow.apache.org/docs/stable/howto/custom-operator.html#creating-a-custom-operator

-1
votes

This is the folder structure that worked for me, make sure the custom operators are inside an operators folder, same for sensors and hooks. The init.py files should be empty.

plugins
├── __init__.py
├── operators
│   ├── __init__.py
│   ├── glue_crawler_operator.py
│   └── gsheet_to_redshift_operator.py
└── sensors
    ├── __init__.py
    └── glue_crawler_sensor.py

Also check that plugins_folder in the airflow.cfg file points to your plugins folder.

For example, to import the gsheet_to_redshift_operator operator without errors I used the following statement:

from operators.gsheet_to_redshift_operator import GsheetToRSOperator