8
votes

Let me simplify my case. I'm using Apache Beam 0.6.0. My final processed result is PCollection<KV<String, String>>. And I want to write values to different files corresponding to their keys.

For example, let's say the result consists of

(key1, value1)
(key2, value2)
(key1, value3)
(key1, value4)

Then I want to write value1, value3 and value4 to key1.txt, and write value4 to key2.txt.

And in my case:

  • Key set is determined when the pipeline is running, not when constructing the pipeline.
  • Key set may be quite small, but the number of values corresponding to each key may be very very large.

Any ideas?

5
@GrahamPolley I think side outputs are decided at graph construction time. Buy my case requires pipeline running time. :-(abcdabcd987
Yup, that's right. Beam doesn't support dynamic side outputs (or inputs) yet.Graham Polley
@GrahamPolley yeah, I know. issues.apache.org/jira/browse/BEAM-92 still unsolved. So I'm wondering if there are some workarounds.abcdabcd987
I don't believe there is a workaround to this.Graham Polley

5 Answers

5
votes

Handily, I wrote a sample of this case just the other day.

This example is dataflow 1.x style

Basically you group by each key, and then you can do this with a custom transform that connects to cloud storage. Caveat being that your list of lines per-file shouldn't be massive (it's got to fit into memory on a single instance, but considering you can run high-mem instances, that limit is pretty high).

    ...
    PCollection<KV<String, List<String>>> readyToWrite = groupedByFirstLetter
                .apply(Combine.perKey(AccumulatorOfWords.getCombineFn()));
        readyToWrite.apply(
                new PTransformWriteToGCS("dataflow-experiment", TonyWordGrouper::derivePath));
    ...

And then the transform doing most of the work is:

public class PTransformWriteToGCS
    extends PTransform<PCollection<KV<String, List<String>>>, PCollection<Void>> {

    private static final Logger LOG = Logging.getLogger(PTransformWriteToGCS.class);

    private static final Storage STORAGE = StorageOptions.getDefaultInstance().getService();

    private final String bucketName;

    private final SerializableFunction<String, String> pathCreator;

    public PTransformWriteToGCS(final String bucketName,
        final SerializableFunction<String, String> pathCreator) {
        this.bucketName = bucketName;
        this.pathCreator = pathCreator;
    }

    @Override
    public PCollection<Void> apply(final PCollection<KV<String, List<String>>> input) {

        return input
            .apply(ParDo.of(new DoFn<KV<String, List<String>>, Void>() {

                @Override
                public void processElement(
                    final DoFn<KV<String, List<String>>, Void>.ProcessContext arg0)
                    throws Exception {
                    final String key = arg0.element().getKey();
                    final List<String> values = arg0.element().getValue();
                    final String toWrite = values.stream().collect(Collectors.joining("\n"));
                    final String path = pathCreator.apply(key);
                    BlobInfo blobInfo = BlobInfo.newBuilder(bucketName, path)
                        .setContentType(MimeTypes.TEXT)
                        .build();
                    LOG.info("blob writing to: {}", blobInfo);
                    Blob result = STORAGE.create(blobInfo,
                        toWrite.getBytes(StandardCharsets.UTF_8));
                }
            }));
    }
}
4
votes

Just write a loop in a ParDo function! More details - I had the same scenario today, the only thing is in my case key=image_label and value=image_tf_record. So like what you have asked, I am trying to create separate TFRecord files, one per class, each record file containing a number of images. HOWEVER not sure if there might be memory issues when a number of values per key are very high like your scenario: (Also my code is in Python)

class WriteToSeparateTFRecordFiles(beam.DoFn):

def __init__(self, outdir):
    self.outdir = outdir

def process(self, element):
    l, image_list = element
    writer = tf.python_io.TFRecordWriter(self.outdir + "/tfr" + str(l) + '.tfrecord')
    for example in image_list:
        writer.write(example.SerializeToString())
    writer.close()

And then in your pipeline just after the stage where you get key-value pairs to add these two lines:

   (p
    | 'GroupByLabelId' >> beam.GroupByKey()
    | 'SaveToMultipleFiles' >> beam.ParDo(WriteToSeparateTFRecordFiles(opt, p))
    )
3
votes

you can use FileIO.writeDinamic() for that

PCollection<KV<String,String>> readfile= (something you read..);

readfile.apply(FileIO. <String,KV<String,String >> writeDynamic()
    .by(KV::getKey)
    .withDestinationCoder(StringUtf8Coder.of())
    .via(Contextful.fn(KV::getValue), TextIO.sink())
    .to("somefolder")
    .withNaming(key -> FileIO.Write.defaultNaming(key, ".txt")));

p.run();
2
votes

In Apache Beam 2.2 Java SDK, this is natively supported in TextIO and AvroIO using respectively TextIO and AvroIO.write().to(DynamicDestinations). See e.g. this method.

Update (2018): Prefer to use FileIO.writeDynamic() together with TextIO.sink() and AvroIO.sink() instead.

-1
votes

Just write below lines in your ParDo class :

from apache_beam.io import filesystems

eventCSVFileWriter = filesystems.FileSystems.create(gcsFileName)
for record in list(Records):
    eventCSVFileWriter.write(record)

If you want the full code I can help you with that too.