2
votes

i am "playing" with apache beam/dataflow in datalab. I am trying to read a csv file from gcs. when i create the pcollection using:

lines = p | 'ReadMyFile' >> beam.io.ReadFromText('gs://' + BUCKET_NAME + '/' + input_file, coder='StrUtf8Coder')

I get the following error:

LookupError: unknown encoding: "THE","NAME","OF","COLUMNS"

it seems the name of columns is interpreted as encoding?

I do not understand what's wrong. If i do not specify the "coder" i get

UnicodeDecodeError: 'utf8' codec can't decode byte 0xe0 in position 1045: invalid continuation byte

Outside apache beam I am able to handle this error by reading the file from gcs:

blob = storage.Blob(gs_path, bucket)
data = blob.download_as_string()
data.decode('utf-8', 'ignore')

I read apache beam only support utf8 and the file does not contain only utf8.

Should I download and then convert to pcollection?

Any suggestion?

2

2 Answers

6
votes

A possible hack is to create a class that inherits from the Coder class (apache_beam.coders.coders.Coder)

from apache_beam.coders.coders import Coder

class ISOCoder(Coder):
    """A coder used for reading and writing strings as ISO-8859-1."""

    def encode(self, value):
        return value.encode('iso-8859-1')

    def decode(self, value):
        return value.decode('iso-8859-1')

    def is_deterministic(self):
        return True

and pass it as an argument to the ReadFromText IO transform (apache_beam.io.textio.ReadFromText) provided by beam like this

from apache_beam.io import ReadFromText

with beam.Pipeline(options=pipeline_options) as p:  
    new_pcollection = (  p | 'Read From GCS' >>
               beam.io.ReadFromText('input_file', coder=ISOCoder())

The logic behind this detailed here

https://medium.com/@khushboo_16578/cloud-dataflow-and-iso-8859-1-2bb8763cc7c8

1
votes

I would suggest changing the coding on the actual file. If you save the file with "Save as" you can select UTF-8 encoding for the format on excel CSVs and regular .txt. Once you do that you need to make sure you add a line of code like

class DoWork(beam.DoFn):
  def process(self, text):
    text = textfilePcollection.encode('utf-8')

    Do other stuff

This isn't how I would like to do it because it isn't code-centric, but it has work for me before. Unfortunately, I don't have a code-centric solution.