When putting a fixed number of strings (800,000 1KB used to test) into a PubSub topic and running the following Apache Beam (2.1.0) job in Dataflow, exactly once semantics are preserved as expected.
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubIO;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.windowing.FixedWindows;
import org.apache.beam.sdk.transforms.windowing.Window;
import org.joda.time.Duration;
public class PubSubToGsSimpleJob {
public static void main(String[] args) {
PubSubToGsPipelineOptions options = PipelineOptionsFactory.fromArgs(args).withValidation()
.as(PubSubToGsPipelineOptions.class);
Pipeline p = Pipeline.create(options);
p.apply(PubsubIO.readStrings().fromSubscription(options.getInput()))
.apply(Window.into(FixedWindows.of(Duration.standardMinutes(1))))
.apply(TextIO.write().withWindowedWrites().withNumShards(1).to(options.getOutput()));
p.run();
}
}
PipelineOptions
implementation below
import org.apache.beam.sdk.options.Description;
import org.apache.beam.sdk.options.PipelineOptions;
public interface PubSubToGsPipelineOptions extends PipelineOptions {
@Description("PubSub subscription")
String getInput();
void setInput(String input);
@Description("Google Cloud Storage output path")
String getOutput();
void setOutput(String output);
}
However, if the same job is run, drained before all the elements are read (as shown in the Dataflow console), and kicked off again, the output files have fewer records than the original data set that was published in to the PubSub topic. This is suggesting that draining and replacing a job can cause data loss which seems odd since this google cloud blog post mentions that Drain and replace
should have at least once semantics. How should this pipeline be designed to achieve at least once semantics (or better yet exactly once semantics) when draining and replacing the job?