Using google-cloud-dataflow/Cloud Composer for CSV to Avro and everything works on my local environment. When trying to read the .avsc file that contains the Avro schema from a Cloud Storage bucket, I keep getting: IOError: [Errno 2] No such file or directory:'gs://my-bucket/xxx.avsc'
Code:
from __future__ import absolute_import
import argparse
import logging
import ntpath
import avro.schema
import apache_beam as beam
from apache_beam.options import pipeline_options
from apache_beam.options.pipeline_options import SetupOptions
from datetime import datetime
class RowTransformer(object):
def __init__(self, delimiter, header, filename):
self.delimiter = delimiter
self.keys = re.split(',', header)
self.filename = filename
def parse(self, row):
self.load_dt = datetime.utcnow()
split_row = row.split(self.delimiter)
#Need to cast anything that is not a string into proper type
split_row[8] = float('0' if not split_row[8] else split_row[8])
split_row[9] = float('0' if not split_row[9] else split_row[9])
split_row[10] = float('0' if not split_row[10] else split_row[10])
split_row[11] = float('0' if not split_row[11] else split_row[11])
split_row[12] = float('0' if not split_row[12] else split_row[12])
split_row[13] = float('0' if not split_row[13] else split_row[13])
split_row[14] = float('0' if not split_row[14] else split_row[14])
split_row[15] = float('0' if not split_row[15] else split_row[15])
split_row[16] = float('0' if not split_row[16] else split_row[16])
split_row[17] = float('0' if not split_row[17] else split_row[17])
split_row[18] = str('0' if not split_row[18] else split_row[18])
split_row[19] = str('0' if not split_row[19] else split_row[19])
split_row.append(self.filename)
split_row.append(self.load_dt.strftime('%Y-%m-%d %H:%M:%S.%f'))
decode_row = [i.decode('UTF-8') if isinstance(i, basestring) else i for i in split_row]
row = dict(zip(self.keys, decode_row))
return row
def run(argv=None):
"""The main function which creates the pipeline and runs it."""
parser = argparse.ArgumentParser()
parser.add_argument('--input', dest='input', required=False,
help='Input file to read. This can be a local file or '
'a file in a Google Storage Bucket.',
default='gs://my-bucket/receive/xxx.txt')
parser.add_argument('--output', dest='output', required=False,
help='Output Avro to Cloud Storage',
default='gs://my-bucket/')
parser.add_argument('--schema', dest='schema', required=False,
help='Avro Schema',
default='gs://my-bucket/xxx.avsc')
parser.add_argument('--delimiter', dest='delimiter', required=False,
help='Delimiter to split input records.',
default='|')
parser.add_argument('--fields', dest='fields', required=False,
help='list of field names expected',
default='Col1,Col2...etc')
known_args, pipeline_args = parser.parse_known_args(argv)
row_transformer = RowTransformer(delimiter=known_args.delimiter,
header=known_args.fields,
filename=ntpath.basename(known_args.input))
p_opts = pipeline_options.PipelineOptions(pipeline_args)
with beam.Pipeline(options=p_opts) as pipeline:
schema_file = avro.schema.parse(open(known_args.schema, "rb").read())
rows = pipeline | "Read from text file" >> beam.io.ReadFromText(known_args.input, skip_header_lines=1)
dict_records = rows | "Convert to Avro" >> beam.Map(lambda r: row_transformer.parse(r))
dict_records | "Write to Cloud Storage as Avro" >> beam.io.avroio.WriteToAvro(known_args.output,schema=schema_file)
run()