0
votes

We are using Apache Beam to process streamed data from pubsub source to GCS sink with dynamic filenames. We are able to write text files, but not wav files.

We are able to write out our byte array string (linear16 wav encoding) working with StringUtf8Coder but have a compilation error with ByteArrayEncoder

// THIS WORKS:

pipelineBeginStage
  .apply(
    FileIO.<String, KamiAppData>writeDynamic()
          .by((SerializableFunction<KamiAppData, String>) input -> input.GCSurl)
          .via(
             Contextful.fn((SerializableFunction<KamiAppData, String>) input -> input.audioStream),
             TextIO.sink())
          .to(outputBucket)
          .withNaming(url -> FileNaming.getNaming(url, "wav"))
          .withDestinationCoder(StringUtf8Coder.of())
          .withNumShards(1));

// THIS YIELDS A COMPILATION ERROR:

pipelineBeginStage
  .apply(
    FileIO.<String, KamiAppData>writeDynamic()
          .by((SerializableFunction<KamiAppData, String>) input -> input.GCSurl)
          .via(
            Contextful.fn((SerializableFunction<KamiAppData, byte[]>) input -> input.audioStream.getBytes()),
            TextIO.sink())
          .to(outputBucket)
          .withNaming(url -> FileNaming.getNaming(url, "wav"))
          .withDestinationCoder(ByteArrayCoder.of())
          .withNumShards(1));

The error we get is:

cannot resolve method 'via(org.apache.beam.sdk.transforms.Contextful.Fn<InputT,OutputT>>,org.apache.beam.sdk.TextIO.Sink)'

How do we make this compile and write the audio to file using Apache Beam APIs?

1
TextIO.sink() is a Sink<String> (which makes sense since it is called "text" IO). And the signature of FileIO.via() expects that the output of the contextfulFn is going to be of the same as the type argument of the sink (which makes sense since you want to write elements of the same type you are producing by the contextfulFn). So the first example works because the output of the contextfulFn is a string and the sink also takes strings. In the second example you want to output bytes but the sink takes strings, which doesn't work.Anton

1 Answers

0
votes

Anton's explanation in the comment above looks very clear to me.

Just want to add that for it to work with byte type, you may need to implement your own FileIO.Sink<byte[]> class.