3
votes

Problem: I am trying to create a cloud dataflow pipeline that reads Avro files from Google Cloud Storage using Python SDK, does some processing and writes back an Avro file on Google Cloud Storage. After looking into some examples provided on Apache Beam website I tried running the following code. I used ReadFromAvro and WriteToAvro functions. The thing I am trying to achieve is just read an Avro file and write the same Avro file using Dataflow but It gives me the following warning and does not output an avro file.

Warning/Error:

/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/site-packages/apache_beam/io/gcp/gcsio.py:121: DeprecationWarning: object() takes no parameters
  super(GcsIO, cls).__new__(cls, storage_client))
INFO:root:Starting the size estimation of the input
INFO:oauth2client.transport:Attempting refresh to obtain initial access_token
INFO:oauth2client.client:Refreshing access_token
INFO:root:Finished the size estimation of the input at 1 files. Estimation took 0.31790304184 seconds
Traceback (most recent call last):
  File "/Users/USER/PycharmProjects/GCP-gcs_to_bq/gcs-bq.py", line 52, in <module>
    run()
  File "/Users/USER/PycharmProjects/GCP-gcs_to_bq/gcs-bq.py", line 47, in run
    records | WriteToAvro(known_args.output)
TypeError: __init__() takes at least 3 arguments (2 given)

Code:

from __future__ import absolute_import

import argparse
import logging

import apache_beam as beam
from apache_beam.io import ReadFromAvro
from apache_beam.io import WriteToAvro
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.options.pipeline_options import SetupOptions


def run(argv=None):
    parser = argparse.ArgumentParser()
    parser.add_argument('--input',
                        dest='input',
                        default='gs://BUCKET/000000_0.avro',
                        help='Input file to process.')
    parser.add_argument('--output',
                        dest='output',
                        default='gs://BUCKET/',
                        #required=True,
                        help='Output file to write results to.')
    known_args, pipeline_args = parser.parse_known_args(argv)
    pipeline_args.extend([
        # CHANGE 2/5: (OPTIONAL) Change this to DataflowRunner to
        # run your pipeline on the Google Cloud Dataflow Service.
        '--runner=DataflowRunner',
        # CHANGE 3/5: Your project ID is required in order to run your pipeline on
        # the Google Cloud Dataflow Service.
        '--project=PROJECT_NAME',
        # CHANGE 4/5: Your Google Cloud Storage path is required for staging local
        # files.
        '--staging_location=gs://BUCKET/staging',
        # CHANGE 5/5: Your Google Cloud Storage path is required for temporary
        # files.
        '--temp_location=gs://BUCKET/temp',
        '--job_name=parse-avro',
    ])
    pipeline_options = PipelineOptions(pipeline_args)
    p = beam.Pipeline(options=pipeline_options)

    # Read the avro file[pattern] into a PCollection.
    records = p | ReadFromAvro(known_args.input)
    records | WriteToAvro(known_args.output)


if __name__ == '__main__':
    logging.getLogger().setLevel(logging.INFO)
    run()

Edit:

I tried adding schema to WriteToAvro function but it now gives me the following error:

Error:

/usr/local/bin/python /Users/USER/PycharmProjects/GCP-gcs_to_bq/gcs-bq.py
No handlers could be found for logger "oauth2client.contrib.multistore_file"

/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/site-packages/apache_beam/coders/typecoders.py:135: UserWarning: Using fallback coder for typehint: <type 'NoneType'>.
  warnings.warn('Using fallback coder for typehint: %r.' % typehint)

Schema:

{"fields": [{"default": null, "type": ["null", {"logicalType": "timestamp-millis", "type": "long"}], "name": "_col0"}, {"default": null, "type": ["null", {"logicalType": "char", "type": "string", "maxLength": 1}], "name": "_col1"}, {"default": null, "type": ["null", {"logicalType": "char", "type": "string", "maxLength": 1}], "name": "_col2"}, {"default": null, "type": ["null", {"logicalType": "char", "type": "string", "maxLength": 1}], "name": "_col3"}, {"default": null, "type": ["null", "long"], "name": "_col4"}, {"default": null, "type": ["null", {"logicalType": "char", "type": "string", "maxLength": 1}], "name": "_col5"}, {"default": null, "type": ["null", {"logicalType": "varchar", "type": "string", "maxLength": 10}], "name": "_col6"}, {"default": null, "type": ["null", "double"], "name": "_col7"}, {"default": null, "type": ["null", "long"], "name": "_col8"}, {"default": null, "type": ["null", {"logicalType": "varchar", "type": "string", "maxLength": 6}], "name": "_col9"}, {"default": null, "type": ["null", {"logicalType": "varchar", "type": "string", "maxLength": 6}], "name": "_col10"}], "type": "record", "name": "baseRecord"}

Code:

pipeline_options = PipelineOptions(pipeline_args)
p = beam.Pipeline(options=pipeline_options)

schema = avro.schema.parse(open("avro.avsc", "rb").read())

# Read the avro file[pattern] into a PCollection.
records = p | ReadFromAvro(known_args.input)
records | WriteToAvro(known_args.output, schema=schema)
2
Did you authenticate with gcloud?rf-
@rf Yes, I did. The wordcount example works completely fine.kaxil
I assume permissions on the bucket and everything else looks good as well. Can you confirm?rf-
Yes, the permissions on the bucket and other components are set and working fine.kaxil

2 Answers

2
votes

The problem was that the data pipeline was actually not getting executed. I managed to fix it. The solution is you need to run the beam pipeline in either one of the following 2 options:

Option 1:

p = beam.Pipeline(options=pipeline_options)

schema = avro.schema.parse(open("avro.avsc", "rb").read())

records = p | 'Read from Avro' >> ReadFromAvro(known_args.input)

# Write the file
records | 'Write to Avro' >> WriteToAvro(known_args.output, schema=schema, file_name_suffix='.avro')

# Run the pipeline
result = p.run()
result.wait_until_finish()

Option 2: Use python with keyword to execute the pipeline:

schema = avro.schema.parse(open("avro.avsc", "rb").read())

with beam.Pipeline(options=pipeline_options) as p:
    records = p | ReadFromAvro(known_args.input)
    records | WriteToAvro(known_args.output, schema=schema, file_name_suffix='.avro')
1
votes

The error says that your code didn't pass all the required arguments to the constructor of WriteToAvro() transform - and indeed it requires at least 2 arguments (filename prefix and schema), but this code passes only 1 (filename prefix).

WriteToAvro currently requires the schema: it is not an optional parameter and there is no workaround to avoid specifying it. The reason for that is that Avro files in general require knowing a schema in advance before creating the file, so WriteToAvro requires knowing the schema too.

Moreover we can not unambiguously infer the schema from the collection returned by ReadFromAvro: imagine that a user passes as --input a filepattern that matches Avro files with several different schemas - which of those schemas would WriteToAvro then have to use?