Problem: I am trying to create a cloud dataflow pipeline that reads Avro files from Google Cloud Storage using Python SDK, does some processing and writes back an Avro file on Google Cloud Storage. After looking into some examples provided on Apache Beam website I tried running the following code. I used ReadFromAvro
and WriteToAvro
functions. The thing I am trying to achieve is just read an Avro file and write the same Avro file using Dataflow but It gives me the following warning and does not output an avro file.
Warning/Error:
/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/site-packages/apache_beam/io/gcp/gcsio.py:121: DeprecationWarning: object() takes no parameters
super(GcsIO, cls).__new__(cls, storage_client))
INFO:root:Starting the size estimation of the input
INFO:oauth2client.transport:Attempting refresh to obtain initial access_token
INFO:oauth2client.client:Refreshing access_token
INFO:root:Finished the size estimation of the input at 1 files. Estimation took 0.31790304184 seconds
Traceback (most recent call last):
File "/Users/USER/PycharmProjects/GCP-gcs_to_bq/gcs-bq.py", line 52, in <module>
run()
File "/Users/USER/PycharmProjects/GCP-gcs_to_bq/gcs-bq.py", line 47, in run
records | WriteToAvro(known_args.output)
TypeError: __init__() takes at least 3 arguments (2 given)
Code:
from __future__ import absolute_import
import argparse
import logging
import apache_beam as beam
from apache_beam.io import ReadFromAvro
from apache_beam.io import WriteToAvro
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.options.pipeline_options import SetupOptions
def run(argv=None):
parser = argparse.ArgumentParser()
parser.add_argument('--input',
dest='input',
default='gs://BUCKET/000000_0.avro',
help='Input file to process.')
parser.add_argument('--output',
dest='output',
default='gs://BUCKET/',
#required=True,
help='Output file to write results to.')
known_args, pipeline_args = parser.parse_known_args(argv)
pipeline_args.extend([
# CHANGE 2/5: (OPTIONAL) Change this to DataflowRunner to
# run your pipeline on the Google Cloud Dataflow Service.
'--runner=DataflowRunner',
# CHANGE 3/5: Your project ID is required in order to run your pipeline on
# the Google Cloud Dataflow Service.
'--project=PROJECT_NAME',
# CHANGE 4/5: Your Google Cloud Storage path is required for staging local
# files.
'--staging_location=gs://BUCKET/staging',
# CHANGE 5/5: Your Google Cloud Storage path is required for temporary
# files.
'--temp_location=gs://BUCKET/temp',
'--job_name=parse-avro',
])
pipeline_options = PipelineOptions(pipeline_args)
p = beam.Pipeline(options=pipeline_options)
# Read the avro file[pattern] into a PCollection.
records = p | ReadFromAvro(known_args.input)
records | WriteToAvro(known_args.output)
if __name__ == '__main__':
logging.getLogger().setLevel(logging.INFO)
run()
Edit:
I tried adding schema to WriteToAvro
function but it now gives me the following error:
Error:
/usr/local/bin/python /Users/USER/PycharmProjects/GCP-gcs_to_bq/gcs-bq.py
No handlers could be found for logger "oauth2client.contrib.multistore_file"
/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/site-packages/apache_beam/coders/typecoders.py:135: UserWarning: Using fallback coder for typehint: <type 'NoneType'>.
warnings.warn('Using fallback coder for typehint: %r.' % typehint)
Schema:
{"fields": [{"default": null, "type": ["null", {"logicalType": "timestamp-millis", "type": "long"}], "name": "_col0"}, {"default": null, "type": ["null", {"logicalType": "char", "type": "string", "maxLength": 1}], "name": "_col1"}, {"default": null, "type": ["null", {"logicalType": "char", "type": "string", "maxLength": 1}], "name": "_col2"}, {"default": null, "type": ["null", {"logicalType": "char", "type": "string", "maxLength": 1}], "name": "_col3"}, {"default": null, "type": ["null", "long"], "name": "_col4"}, {"default": null, "type": ["null", {"logicalType": "char", "type": "string", "maxLength": 1}], "name": "_col5"}, {"default": null, "type": ["null", {"logicalType": "varchar", "type": "string", "maxLength": 10}], "name": "_col6"}, {"default": null, "type": ["null", "double"], "name": "_col7"}, {"default": null, "type": ["null", "long"], "name": "_col8"}, {"default": null, "type": ["null", {"logicalType": "varchar", "type": "string", "maxLength": 6}], "name": "_col9"}, {"default": null, "type": ["null", {"logicalType": "varchar", "type": "string", "maxLength": 6}], "name": "_col10"}], "type": "record", "name": "baseRecord"}
Code:
pipeline_options = PipelineOptions(pipeline_args)
p = beam.Pipeline(options=pipeline_options)
schema = avro.schema.parse(open("avro.avsc", "rb").read())
# Read the avro file[pattern] into a PCollection.
records = p | ReadFromAvro(known_args.input)
records | WriteToAvro(known_args.output, schema=schema)