2
votes

I'm trying to created a DAG in Cloud Composer. When importing, I'm getting the following error:

Broken DAG: [/home/airflow/gcs/dags/airflow_bigquery_v12.py] cannot import name _parse_data

This is the DAG file. As you'll see, it tries to copy a cloud storage file into bigquery:

import datetime
from datetime import timedelta, datetime
from airflow import DAG
from airflow.contrib.operators.bigquery_operator import BigQueryOperator
from airflow.contrib.operators.bigquery_to_gcs import BigQueryToCloudStorageOperator
from airflow.contrib.operators.gcs_to_bq import GoogleCloudStorageToBigQueryOperator

seven_days_ago = datetime.combine(datetime.today() - timedelta(7),
                                  datetime.min.time())
YESTERDAY = datetime.datetime.combine(
    datetime.datetime.today() - datetime.timedelta(1),
    datetime.datetime.min.time())

DEFAULT_ARGS = {
    # Setting start date as yesterday starts the DAG immediately when it is
    # detected in the Cloud Storage bucket.
    'start_date': YESTERDAY,
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 2,
    'retry_delay': datetime.timedelta(minutes=1),
    'project_id': models.Variable.get('gcp_project')
}


with DAG('airflow_bigquery_v12',
         default_args=DEFAULT_ARGS,
         schedule_interval=timedelta(days=1),
         catchup=False
         ) as dag:


    start_task = DummyOperator(task_id="start", dag=dag)
    end_task = DummyOperator(task_id="end", dag=dag)



    gcs_to_bigquery_rides = GoogleCloudStorageToBigQueryOperator(
        dag=dag,
        task_id='load_to_BigQuery_stage',
        bucket='my_bucket',
        destination_project_dataset_table='misc.pg_rides_json_airflow',
        source_format='NEWLINE_DELIMITED_JSON',
        source_objects=['rides_new.json'],
        #ignore_unknown_values = True,
        #schema_fields=dc(),
        schema_object= 'rides_schema.json',
        create_disposition='CREATE_IF_NEEDED',
        write_disposition='WRITE_TRUNCATE',
        #skip_leading_rows = 1,
        google_cloud_storage_conn_id='google_cloud_storage_default',
        bigquery_conn_id='bigquery_default'
        )

start_task >> gcs_to_bigquery_rides >> end_task

For reference, this is the rides_new.json file, which is found inside 'my_bucket', holding the schema for the table to be created

[
  {
    "mode": "NULLABLE",
    "name": "finish_picture_state",
    "type": "STRING"
  },
  {
    "mode": "NULLABLE",
    "name": "finish_picture_file_id",
    "type": "STRING"
  },
  {
    "mode": "NULLABLE",
    "name": "finish_reason",
    "type": "STRING"
  },
  {
    "mode": "NULLABLE",
    "name": "starting_battery_level",
    "type": "INTEGER"
  },
  {
    "mode": "NULLABLE",
    "name": "finished_at",
    "type": "TIMESTAMP"
  },
  {
    "mode": "NULLABLE",
    "name": "created_at",
    "type": "TIMESTAMP"
  },
  {
    "mode": "NULLABLE",
    "name": "ending_battery_level",
    "type": "INTEGER"
  },
  {
    "mode": "NULLABLE",
    "name": "state",
    "type": "STRING"
  },
  {
    "fields": [
      {
        "mode": "NULLABLE",
        "name": "currency",
        "type": "STRING"
      },
      {
        "mode": "NULLABLE",
        "name": "amount",
        "type": "INTEGER"
      }
    ],
    "mode": "NULLABLE",
    "name": "cost",
    "type": "RECORD"
  },
  {
    "mode": "NULLABLE",
    "name": "stoped_since",
    "type": "TIMESTAMP"
  },
  {
    "mode": "NULLABLE",
    "name": "user_id",
    "type": "STRING"
  },
  {
    "mode": "NULLABLE",
    "name": "minutes",
    "type": "INTEGER"
  },
  {
    "mode": "NULLABLE",
    "name": "id",
    "type": "STRING"
  },
  {
    "mode": "NULLABLE",
    "name": "vehicle_id",
    "type": "STRING"
  },
  {
    "mode": "NULLABLE",
    "name": "distance",
    "type": "FLOAT"
  },
  {
    "mode": "NULLABLE",
    "name": "service_area_id",
    "type": "STRING"
  },
  {
    "fields": [
      {
        "fields": [
          {
            "mode": "NULLABLE",
            "name": "currency",
            "type": "STRING"
          },
          {
            "mode": "NULLABLE",
            "name": "amount",
            "type": "INTEGER"
          }
        ],
        "mode": "NULLABLE",
        "name": "base",
        "type": "RECORD"
      },
      {
        "fields": [
          {
            "mode": "NULLABLE",
            "name": "currency",
            "type": "STRING"
          },
          {
            "mode": "NULLABLE",
            "name": "amount",
            "type": "INTEGER"
          }
        ],
        "mode": "NULLABLE",
        "name": "per_minute",
      }
    ],
    "mode": "NULLABLE",
    "name": "pricing",
    "type": "RECORD"
  },
  {
    "fields": [
      {
        "mode": "NULLABLE",
        "name": "m",
        "type": "FLOAT"
      },
      {
        "mode": "NULLABLE",
        "name": "latitude",
        "type": "FLOAT"
      },
      {
        "mode": "NULLABLE",
        "name": "longitude",
        "type": "FLOAT"
      }
    ],
    "mode": "REPEATED",
    "name": "path",
    "type": "RECORD"
  }
]

your help is much appreciated. thanks

1
Welcome to StackOverflow. After looking into the source code of Airflow, it seems like _parse_data function appears in BigQueryHook : github.com/apache/airflow/blob/1.9.0/airflow/contrib/hooks/…. This hook is used by GoogleCloudStorageToBigQueryOperator, but only on Airflow version 1.9.0. Is that the version you're running? By the way, can you reproduce the error in a local Airflow install, or does it only happen when deployed on Composer?norbjd
Could you indicate what version of pandas-gbq and apache-airflow caused this issue in your case? This can help others.rsantiago

1 Answers

1
votes

_parse_data is obsoleted on pandas-gbq 0.10.0.

https://github.com/pydata/pandas-gbq/commit/ebcbfbe1fecc90ac9454751206115adcafe4ce24#diff-4db670026d33c02e5ad3dfbd5e4fd595L664

And Airflow stopped using _parse_data after 1.10.0.

https://github.com/apache/airflow/commit/8ba86072f9c5ef81933cd6546e7e2f000f862053#diff-ee06f8fcbc476ea65446a30160c2a2b2L27

It is needed to:

  • Downgrade apache-airflow to a version less-than 1.10.0 or

  • Downgrade pandas-gbq to a version less-than 0.10.0.