I'm trying to created a DAG in Cloud Composer. When importing, I'm getting the following error:
Broken DAG: [/home/airflow/gcs/dags/airflow_bigquery_v12.py] cannot import name _parse_data
This is the DAG file. As you'll see, it tries to copy a cloud storage file into bigquery:
import datetime
from datetime import timedelta, datetime
from airflow import DAG
from airflow.contrib.operators.bigquery_operator import BigQueryOperator
from airflow.contrib.operators.bigquery_to_gcs import BigQueryToCloudStorageOperator
from airflow.contrib.operators.gcs_to_bq import GoogleCloudStorageToBigQueryOperator
seven_days_ago = datetime.combine(datetime.today() - timedelta(7),
datetime.min.time())
YESTERDAY = datetime.datetime.combine(
datetime.datetime.today() - datetime.timedelta(1),
datetime.datetime.min.time())
DEFAULT_ARGS = {
# Setting start date as yesterday starts the DAG immediately when it is
# detected in the Cloud Storage bucket.
'start_date': YESTERDAY,
'email_on_failure': False,
'email_on_retry': False,
'retries': 2,
'retry_delay': datetime.timedelta(minutes=1),
'project_id': models.Variable.get('gcp_project')
}
with DAG('airflow_bigquery_v12',
default_args=DEFAULT_ARGS,
schedule_interval=timedelta(days=1),
catchup=False
) as dag:
start_task = DummyOperator(task_id="start", dag=dag)
end_task = DummyOperator(task_id="end", dag=dag)
gcs_to_bigquery_rides = GoogleCloudStorageToBigQueryOperator(
dag=dag,
task_id='load_to_BigQuery_stage',
bucket='my_bucket',
destination_project_dataset_table='misc.pg_rides_json_airflow',
source_format='NEWLINE_DELIMITED_JSON',
source_objects=['rides_new.json'],
#ignore_unknown_values = True,
#schema_fields=dc(),
schema_object= 'rides_schema.json',
create_disposition='CREATE_IF_NEEDED',
write_disposition='WRITE_TRUNCATE',
#skip_leading_rows = 1,
google_cloud_storage_conn_id='google_cloud_storage_default',
bigquery_conn_id='bigquery_default'
)
start_task >> gcs_to_bigquery_rides >> end_task
For reference, this is the rides_new.json file, which is found inside 'my_bucket', holding the schema for the table to be created
[
{
"mode": "NULLABLE",
"name": "finish_picture_state",
"type": "STRING"
},
{
"mode": "NULLABLE",
"name": "finish_picture_file_id",
"type": "STRING"
},
{
"mode": "NULLABLE",
"name": "finish_reason",
"type": "STRING"
},
{
"mode": "NULLABLE",
"name": "starting_battery_level",
"type": "INTEGER"
},
{
"mode": "NULLABLE",
"name": "finished_at",
"type": "TIMESTAMP"
},
{
"mode": "NULLABLE",
"name": "created_at",
"type": "TIMESTAMP"
},
{
"mode": "NULLABLE",
"name": "ending_battery_level",
"type": "INTEGER"
},
{
"mode": "NULLABLE",
"name": "state",
"type": "STRING"
},
{
"fields": [
{
"mode": "NULLABLE",
"name": "currency",
"type": "STRING"
},
{
"mode": "NULLABLE",
"name": "amount",
"type": "INTEGER"
}
],
"mode": "NULLABLE",
"name": "cost",
"type": "RECORD"
},
{
"mode": "NULLABLE",
"name": "stoped_since",
"type": "TIMESTAMP"
},
{
"mode": "NULLABLE",
"name": "user_id",
"type": "STRING"
},
{
"mode": "NULLABLE",
"name": "minutes",
"type": "INTEGER"
},
{
"mode": "NULLABLE",
"name": "id",
"type": "STRING"
},
{
"mode": "NULLABLE",
"name": "vehicle_id",
"type": "STRING"
},
{
"mode": "NULLABLE",
"name": "distance",
"type": "FLOAT"
},
{
"mode": "NULLABLE",
"name": "service_area_id",
"type": "STRING"
},
{
"fields": [
{
"fields": [
{
"mode": "NULLABLE",
"name": "currency",
"type": "STRING"
},
{
"mode": "NULLABLE",
"name": "amount",
"type": "INTEGER"
}
],
"mode": "NULLABLE",
"name": "base",
"type": "RECORD"
},
{
"fields": [
{
"mode": "NULLABLE",
"name": "currency",
"type": "STRING"
},
{
"mode": "NULLABLE",
"name": "amount",
"type": "INTEGER"
}
],
"mode": "NULLABLE",
"name": "per_minute",
}
],
"mode": "NULLABLE",
"name": "pricing",
"type": "RECORD"
},
{
"fields": [
{
"mode": "NULLABLE",
"name": "m",
"type": "FLOAT"
},
{
"mode": "NULLABLE",
"name": "latitude",
"type": "FLOAT"
},
{
"mode": "NULLABLE",
"name": "longitude",
"type": "FLOAT"
}
],
"mode": "REPEATED",
"name": "path",
"type": "RECORD"
}
]
your help is much appreciated. thanks
_parse_data
function appears inBigQueryHook
: github.com/apache/airflow/blob/1.9.0/airflow/contrib/hooks/…. This hook is used byGoogleCloudStorageToBigQueryOperator
, but only on Airflow version 1.9.0. Is that the version you're running? By the way, can you reproduce the error in a local Airflow install, or does it only happen when deployed on Composer? – norbjd