4
votes

I am trying to run my Beam code on Spark for a POC. I am running the application on Google Cloud Dataproc for testing. It is a very simple test to read from a PubSub topic and write the message to a bucket on Google Cloud Storage. Dataproc cluster has the right version for Spark and is enabled to access other GCP API's.

I tried with FileIO aswell but that did not work either. I tried publishing to another PubSub topic instead of writing and that worked but that is not my use case. I tried printing before writing with TextIO and that confirmed that I can read messages from PubSub.

Here is the pipeline:

PCollection<String> messages = pipeline
    .apply(PubsubIO.readStrings().fromSubscription(sub))
    .apply(Window.into(FixedWindows.of(Duration.standardSeconds(1))));
 messages.apply(TextIO.write().to("gs://...").withNumShards(1).withWindowedWrites());

pipeline.run();

I don't see any logs on Dataproc job output. No errors or anything at all. There is no file on bucket either.

1
I just tried and TextIO does not work with FlinkRunner either. - hbk

1 Answers

2
votes

I found that this is a problem of triggering. Here is the detailed discussion:
https://lists.apache.org/thread.html/a831da3cd74159bf0e0f3fe77363b022cde943ba40c6ab68bb33d5bb@%3Cuser.beam.apache.org%3E

I fixed this by changing my windowing transform into an early firing trigger:

.apply(Window.<String>into(FixedWindows.of(Duration.standardSeconds(10)))
           .triggering(AfterWatermark.pastEndOfWindow()
                .withEarlyFirings(AfterProcessingTime.pastFirstElementInPane()
                                     .alignedTo(Duration.standardSeconds(10))))
                .withAllowedLateness(Duration.standardSeconds(10))
                .discardingFiredPanes())