I´m not able to connect in the SQL Server inside Airflow using docker-compose. I want to take data from SQL Server direct to Cloud Storage and then the data will be sent to Big Query.
How to solve this?
import json
from datetime import timedelta, datetime
from airflow import DAG
from airflow.models import Variable
from airflow.contrib.operators.bigquery_operator import BigQueryOperator
from airflow.contrib.operators.bigquery_check_operator import BigQueryCheckOperator
from airflow.contrib.operators.file_to_gcs import FileToGoogleCloudStorageOperator
from airflow.contrib.operators.gcs_to_bq import GoogleCloudStorageToBigQueryOperator
from airflow.contrib.operators.mysql_to_gcs import MySqlToGoogleCloudStorageOperator
default_args = {
'owner': 'Test Data',
'depends_on_past': True,
'start_date': datetime(2019, 5, 29),
'end_date': datetime(2019, 5, 30),
'email': ['[email protected]'],
'email_on_failure': True,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=5),
}
# Set Schedule: Run pipeline once a day.
# Use cron to define exact time. Eg. 8:15am would be "15 08 * * *"
schedule_interval = "* * * * *"
# Define DAG: Set ID and assign default args and schedule interval
dag = DAG(
'bigquery_github_trends',
default_args=default_args,
schedule_interval=schedule_interval
)
extract = MySqlToGoogleCloudStorageOperator(
task_id='chama_extract',
mysql_conn_id='mysql_hml',
google_cloud_storage_conn_id='my_gcp_conn',
sql="""SELECT * FROM test""",
bucket='my_bucket',
filename='test/test{}.json',
schema_filename='schemas/test.json',
dag=dag)
load = GoogleCloudStorageToBigQueryOperator(
task_id='chama_load',
bigquery_conn_id='my_gcp_conn',
google_cloud_storage_conn_id='my_gcp_conn',
bucket='my_bucket',
destination_project_dataset_table="tst.teste123",
source_objects=['test/test0.json'],
schema_object='schemas/test.json',
source_format='NEWLINE_DELIMITED_JSON',
create_disposition='CREATE_IF_NEEDED',
write_disposition='WRITE_TRUNCATE',
dag=dag)
# Setting up Dependencies
load.set_upstream(extract)
Docker-compose.yml
version: '3'
services:
postgres:
image: postgres:9.6
environment:
- POSTGRES_USER=airflow
- POSTGRES_PASSWORD=airflow
- POSTGRES_DB=airflow
ports:
- "5432:5432"
webserver:
image: puckel/docker-airflow:1.10.1
build:
context: https://github.com/puckel/docker-airflow.git#1.10.1
dockerfile: Dockerfile
args:
AIRFLOW_DEPS: gcp_api,s3
restart: always
depends_on:
- postgres
environment:
- LOAD_EX=n
- EXECUTOR=Local
- FERNET_KEY=jsDPRErfv8Z_eVTnGfF8ywd19j4pyqE3NpdUBA_oRTo=
volumes:
- ./examples/intro-example/dags:/usr/local/airflow/dags
# Uncomment to include custom plugins
# - ./plugins:/usr/local/airflow/plugins
ports:
- "8080:8080"
command: webserver
healthcheck:
test: ["CMD-SHELL", "[ -f /usr/local/airflow/airflow-webserver.pid ]"]
interval: 30s
timeout: 30s
retries: 3
docker-compose-gcloud.yml
version: '3'
services:
postgres:
image: postgres:9.6
environment:
- POSTGRES_USER=airflow
- POSTGRES_PASSWORD=airflow
- POSTGRES_DB=airflow
ports:
- "5432:5432"
webserver:
image: puckel/docker-airflow:1.10.1
build:
context: https://github.com/puckel/docker-airflow.git#1.10.1
dockerfile: Dockerfile
args:
AIRFLOW_DEPS: gcp_api,s3
restart: always
depends_on:
- postgres
environment:
- LOAD_EX=n
- EXECUTOR=Local
- FERNET_KEY=jsDPRErfv8Z_eVTnGfF8ywd19j4pyqE3NpdUBA_oRTo=
volumes:
- ./examples/gcloud-example/dags:/usr/local/airflow/dags
# Uncomment to include custom plugins
# - ./plugins:/usr/local/airflow/plugins
ports:
- "8080:8080"
command: webserver
healthcheck:
test: ["CMD-SHELL", "[ -f /usr/local/airflow/airflow-webserver.pid ]"]
interval: 30s
timeout: 30s
retries: 3
And execute in docker the command:
docker-compose -f docker-compose-gcloud.yml up --abort-on-container-exit
Error message in Airflow:
[2019-05-29 07:00:37,938] {{logging_mixin.py:95}} INFO - [2019-05-29 07:00:37,937] {{base_hook.py:83}} INFO - Using connection to: 10.0.0.1 [2019-05-29 07:00:58,974] {{models.py:1760}} ERROR - (2003, 'Can\'t connect to MySQL server on 10.0.0.1 (111 "Connection refused")')
Traceback (most recent call last):
File "/usr/local/lib/python3.6/site-packages/airflow/models.py", line 1659, in _run_raw_task
result = task_copy.execute(context=context)
File "/usr/local/lib/python3.6/site-packages/airflow/contrib/operators/mysql_to_gcs.py", line 105, in execute
cursor = self._query_mysql()
File "/usr/local/lib/python3.6/site-packages/airflow/contrib/operators/mysql_to_gcs.py", line 127, in _query_mysql
conn = mysql.get_conn()
File "/usr/local/lib/python3.6/site-packages/airflow/hooks/mysql_hook.py", line 103, in get_conn
conn = MySQLdb.connect(**conn_config)
File "/usr/local/lib/python3.6/site-packages/MySQLdb/init.py", line 84, in Connect
return Connection(*args, **kwargs)
File "/usr/local/lib/python3.6/site-packages/MySQLdb/connections.py", line 164, in init
super(Connection, self).init(*args, **kwargs2)
MySQLdb._exceptions.OperationalError: (2003, 'Can\'t connect to MySQL server on 10.0.0.1 (111 "Connection refused")')
[2019-05-29 07:00:58,988] {{models.py:1789}} INFO - All retries failed; marking task as FAILED
[2019-05-29 07:00:58,992] {{logging_mixin.py:95}} INFO - [2019-05-29 07:00:58,991] {{configuration.py:255}} WARNING - section/key [smtp/smtp_user] not found in config
[2019-05-29 07:00:58,998] {{models.py:1796}} ERROR - [Errno 99] Cannot assign requested address
Traceback (most recent call last):
File "/usr/local/lib/python3.6/site-packages/airflow/models.py", line 1659, in _run_raw_task
result = task_copy.execute(context=context)
File "/usr/local/lib/python3.6/site-packages/airflow/contrib/operators/mysql_to_gcs.py", line 105, in execute
cursor = self._query_mysql()
File "/usr/local/lib/python3.6/site-packages/airflow/contrib/operators/mysql_to_gcs.py", line 127, in _query_mysql
conn = mysql.get_conn()
File "/usr/local/lib/python3.6/site-packages/airflow/hooks/mysql_hook.py", line 103, in get_conn
conn = MySQLdb.connect(**conn_config)
File "/usr/local/lib/python3.6/site-packages/MySQLdb/init.py", line 84, in Connect
return Connection(*args, **kwargs)
File "/usr/local/lib/python3.6/site-packages/MySQLdb/connections.py", line 164, in init
super(Connection, self).init(*args, **kwargs2)
MySQLdb._exceptions.OperationalError: (2003, 'Can\'t connect to MySQL server on 10.0.0.1 (111 "Connection refused")')