The problem:
You're trying to emit a PCollection
as an output of your ParDo
. This doesn't work.
Details:
PCollection
is an abstraction that represents a potentially unbounded collection of elements. Applying a transformation to a PCollection
gives you another PCollection
. One of the transformations you can apply is a ParDo
. ParDos
make element-wise transforms. When applying a ParDo
you're expressing - "take this PCollection
and make another one by converting all elements within it by applying that ParDo
".
One of the things that makes the processing effective is ability to execute everything in parallel, e.g. converting a lot of elements at once on multiple execution nodes (e.g. VMs/machines) by running the same ParDo
on each against different elements. And you can't explicitly control whether any specific transform will happen on the same execution node or another one, it's part of the underlying system design how to optimize this. But to enable this you must be able to potentially pass elements around between execution nodes and persist them for aggregation. Beam supports this by requiring you to implement Coders
for elements. Coders are a serialization mechanism that allows Beam to convert an element (represented by a java object) to a byte array which can then be passed to the next transformation (that can potentially happen on another machine) or storage. For example, Beam needs to be able to encode the elements that you output from a ParDo
. Beam knows how to serialize some types, but it cannot infer everything automatically, you have to provide coders for something that cannot be inferred.
Your example looks like this: take some PCollection
, and convert it into another PCollection
by applying a ParDo
to each element, and that ParDo
transforms each input element into a PCollection
. This means that as soon as element gets processed by a ParDo
you have to encode it and pass it to the next transformation. And the question here is - how do you encode and pass a (potentially unbounded) PCollection
to the next transform or persist it for aggregation?
Beam doesn't support thisat the moment, so you will need to choose another approach.
In your specific case I am not sure if in Beam out of the box you can simply use a stream of filenames and the convert them into sub-pipelines for processing the lines in the files.
Workarounds:
Few approaches I can think of to bypass this limitation:
If your file names have a known pattern, you can specify the pattern in TextIO
and it can read the new files as they arrive.
If they don't have a known pattern, you can potentially write another pipeline to rename the files names so that they have common name pattern and then use the pattern in TextIO
in another pipeline.
If feasible (e.g. files fit in memory), you could probably read the files contents with pure java File
API, split them into rows and emit those rows in a single ParDo
. Then you can apply the same StringToEventRowFn
in the following ParDo
.
Hope this helps