0
votes

I'm running batch pipelines in Google Cloud Dataflow. I need to read objects in one pipeline that another pipeline has previously written. The easiest wa objects is pickle / dill.

The writing works well, writing a number of files, each with a pickled object. When I download the file manually, I can unpickle the file. Code for writing: beam.io.WriteToText('gs://{}', coder=coders.DillCoder())

But the reading breaks every time, with one of the errors below. Code for reading: beam.io.ReadFromText('gs://{}*', coder=coders.DillCoder())

Either...

  File "/usr/local/lib/python2.7/dist-packages/dill/dill.py", line 266, in load
    obj = pik.load()
  File "/usr/lib/python2.7/pickle.py", line 858, in load
    dispatch[key](self)
KeyError: '\x90'

...or...

  File "/usr/local/lib/python2.7/dist-packages/dill/dill.py", line 423, in find_class
    return StockUnpickler.find_class(self, module, name)
  File "/usr/lib/python2.7/pickle.py", line 1124, in find_class
    __import__(module)
ImportError: No module named measur

(the class of the object sits in a path with measure, though not sure why it misses the last character there)

I've tried using the default coder, and a BytesCoder, and pickling & unpickling as a custom task in the pipeline.

My working hypothesis is the reader splitting the file by line, and so treating a single pickle (which has new lines within it) as multiple objects. If so, is there a way of avoiding that?

I could attempt to build a reader myself, but I'm hesitant since this seems like a well-solved problem (e.g. Beam already has a format to move objects from one pipeline stage to another).

Tangentially related: How to read blob (pickle) files from GCS in a Google Cloud DataFlow job?

Thank you!

2

2 Answers

1
votes

ReadFromText is designed to read new line separated records in text files hence is not suitable for your use-case. Implementing FileBasedSource is not a good solution either since it's designed for reading large files with multiple records (and usually splits these files into shards for parallel processing). So, in your case, the current best solution for Python SDK is to implement a source yourself. This can be as simple as a ParDo that reads files and produces a PCollection of records. If your ParDo produce a large number of records consider adding a apache_beam.transforms.util.Reshuffle step following that which will allow runners to parallelize following steps better. For Java SDK we have FileIO which already provides transforms to make this bit easier.

0
votes

Encoding as string_escape escapes the newlines, so the only newlines that Beam sees are those between pickles:

class DillMultiCoder(DillCoder):
    """
    Coder that allows multi-line pickles to be read
    After an object is pickled, the bytes are encoded as `unicode_escape`,
    meaning newline characters (`\n`) aren't in the string.

    Previously, the presence of newline characters these confues the Dataflow
    reader, as it can't discriminate between a new object and a new line
    within a pickle string
    """

    def _create_impl(self):
        return coder_impl.CallbackCoderImpl(
            maybe_dill_multi_dumps, maybe_dill_multi_loads)


def maybe_dill_multi_dumps(o):
    # in Py3 this needs to be `unicode_escape`
    return maybe_dill_dumps(o).encode('string_escape')


def maybe_dill_multi_loads(o):
    # in Py3 this needs to be `unicode_escape`
    return maybe_dill_loads(o.decode('string_escape'))

For large pickles, I also needed to set the buffersize much higher to 8MB - on the previous buffer size (8kB), a 120MB file spun for 2 days of CPU time:

class ReadFromTextPickle(ReadFromText):
    """
    Same as ReadFromText, but with a really big buffer. With the standard 8KB
    buffer, large files can be read on a loop and never finish

    Also added DillMultiCoder
    """

    def __init__(
            self,
            file_pattern=None,
            min_bundle_size=0,
            compression_type=CompressionTypes.AUTO,
            strip_trailing_newlines=True,
            coder=DillMultiCoder(),
            validate=True,
            skip_header_lines=0,
            **kwargs):
        # needs commenting out, not sure why    
        # super(ReadFromTextPickle, self).__init__(**kwargs)
        self._source = _TextSource(
            file_pattern,
            min_bundle_size,
            compression_type,
            strip_trailing_newlines=strip_trailing_newlines,
            coder=coder,
            validate=validate,
            skip_header_lines=skip_header_lines,
            buffer_size=8000000)

Another approach would be to implement a PickleFileSource inherited from FileBasedSource and call pickle.load on the file - each call would yield a new object. But there's a bunch of complication around offset_range_tracker that looked like more lift than strictly necessary