0
votes

I am trying to build a dataflow template.

The goal is to read ValueProvider that will tell me what files to read. Then for each files I need to read and enrich data with the object. I have tried this:

        p.apply(Create.of(options.getScheduleBatch()))
            .apply(ParDo.of(StringScheduleBatchToFileReceivedFn.of()))
            .apply(ParDo.of(new DoFn<FileReceived, PCollection<EventRow>>() {
                @ProcessElement
                public void process(ProcessContext c) {
                    FileReceived fileReceived = c.element();
                    Broker broker = configuration.getBroker(fileReceived.getBrokerId());
                    PCollection<EventRow> eventRows = p
                            .apply(TextIO.read().from(fileReceived.getUri()))
                            .apply(ParDo.of(StringToEventRowFn.of(broker, fileReceived, options.getJobName())));
                    c.output(eventRows);
                }
            }));

But I have the following error:

Inferring a Coder from the CoderRegistry failed: Unable to provide a Coder for org.apache.beam.sdk.values.PCollection.

I would love to find a better way than reading the file by myself using gcs client.

Do you have any tips ?

Best regards

1

1 Answers

1
votes

The problem:

You're trying to emit a PCollection as an output of your ParDo. This doesn't work.

Details:

PCollection is an abstraction that represents a potentially unbounded collection of elements. Applying a transformation to a PCollection gives you another PCollection. One of the transformations you can apply is a ParDo. ParDos make element-wise transforms. When applying a ParDo you're expressing - "take this PCollection and make another one by converting all elements within it by applying that ParDo".

One of the things that makes the processing effective is ability to execute everything in parallel, e.g. converting a lot of elements at once on multiple execution nodes (e.g. VMs/machines) by running the same ParDo on each against different elements. And you can't explicitly control whether any specific transform will happen on the same execution node or another one, it's part of the underlying system design how to optimize this. But to enable this you must be able to potentially pass elements around between execution nodes and persist them for aggregation. Beam supports this by requiring you to implement Coders for elements. Coders are a serialization mechanism that allows Beam to convert an element (represented by a java object) to a byte array which can then be passed to the next transformation (that can potentially happen on another machine) or storage. For example, Beam needs to be able to encode the elements that you output from a ParDo. Beam knows how to serialize some types, but it cannot infer everything automatically, you have to provide coders for something that cannot be inferred.

Your example looks like this: take some PCollection, and convert it into another PCollection by applying a ParDo to each element, and that ParDo transforms each input element into a PCollection. This means that as soon as element gets processed by a ParDo you have to encode it and pass it to the next transformation. And the question here is - how do you encode and pass a (potentially unbounded) PCollection to the next transform or persist it for aggregation?

Beam doesn't support thisat the moment, so you will need to choose another approach.

In your specific case I am not sure if in Beam out of the box you can simply use a stream of filenames and the convert them into sub-pipelines for processing the lines in the files.

Workarounds:

Few approaches I can think of to bypass this limitation:

  • If your file names have a known pattern, you can specify the pattern in TextIO and it can read the new files as they arrive.

  • If they don't have a known pattern, you can potentially write another pipeline to rename the files names so that they have common name pattern and then use the pattern in TextIO in another pipeline.

  • If feasible (e.g. files fit in memory), you could probably read the files contents with pure java File API, split them into rows and emit those rows in a single ParDo. Then you can apply the same StringToEventRowFn in the following ParDo.

Hope this helps