We are using Apache Beam to process streamed data from pubsub source to GCS sink with dynamic filenames. We are able to write text files, but not wav files.
We are able to write out our byte array string (linear16 wav encoding) working with StringUtf8Coder but have a compilation error with ByteArrayEncoder
// THIS WORKS:
pipelineBeginStage
.apply(
FileIO.<String, KamiAppData>writeDynamic()
.by((SerializableFunction<KamiAppData, String>) input -> input.GCSurl)
.via(
Contextful.fn((SerializableFunction<KamiAppData, String>) input -> input.audioStream),
TextIO.sink())
.to(outputBucket)
.withNaming(url -> FileNaming.getNaming(url, "wav"))
.withDestinationCoder(StringUtf8Coder.of())
.withNumShards(1));
// THIS YIELDS A COMPILATION ERROR:
pipelineBeginStage
.apply(
FileIO.<String, KamiAppData>writeDynamic()
.by((SerializableFunction<KamiAppData, String>) input -> input.GCSurl)
.via(
Contextful.fn((SerializableFunction<KamiAppData, byte[]>) input -> input.audioStream.getBytes()),
TextIO.sink())
.to(outputBucket)
.withNaming(url -> FileNaming.getNaming(url, "wav"))
.withDestinationCoder(ByteArrayCoder.of())
.withNumShards(1));
The error we get is:
cannot resolve method 'via(org.apache.beam.sdk.transforms.Contextful.Fn<InputT,OutputT>>,org.apache.beam.sdk.TextIO.Sink)'
How do we make this compile and write the audio to file using Apache Beam APIs?
TextIO.sink()
is aSink<String>
(which makes sense since it is called "text" IO). And the signature ofFileIO.via()
expects that the output of thecontextfulFn
is going to be of the same as the type argument of the sink (which makes sense since you want to write elements of the same type you are producing by thecontextfulFn
). So the first example works because the output of thecontextfulFn
is a string and the sink also takes strings. In the second example you want to output bytes but the sink takes strings, which doesn't work. – Anton