3
votes

I have a simple job (Apache Beam SDK for Java 2.2.0) that read messages from PubSub subscription, read configs from side input, apply transformations to messages and send results to another PubSub topic

The issue is that the number of outgoing messages is not equal to the number of incoming messages. I'm inserting 15 millions messages very quickly from another job (without manually specifying a timestamp). Issue seems to come with the presence of side input because without I have no more loss. In Dataflow monitoring we can see about 20000 lost messages.

Job ID on DataflowRunner: 2018-01-17_05_33_45-3290466857677892673

enter image description here

If I restart the same job the number of lost messages is not the same

I created simple snippets to illustrate my issue

Publisher

String PROJECT_ID = "...";

PipelineOptions options = PipelineOptionsFactory.fromArgs(args).withValidation().create();
Pipeline p = Pipeline.create(options);

p
    .apply(GenerateSequence.from(0).to(15000000))
    .apply(MapElements.into(TypeDescriptors.strings()).via(Object::toString))
    .apply(PubsubIO.writeStrings().to("projects/" + PROJECT_ID + "/topics/test_in"));

p.run();

Listenner

String PROJECT_ID = "...";

PipelineOptions options = PipelineOptionsFactory.fromArgs(args).withValidation().create();
Pipeline p = Pipeline.create(options);

PCollectionView<Long> sideInput = p
    .apply(GenerateSequence.from(0).to(10))
    .apply(Count.globally())
    .apply(View.asSingleton());

p
    // 15,000,000 in input
    .apply(PubsubIO.readMessages().fromSubscription("projects/" + PROJECT_ID + "/subscriptions/test_in"))
    .apply(ParDo.of(new DoFn<PubsubMessage, PubsubMessage>() {
        @ProcessElement
        public void processElement(ProcessContext c) {
            c.output(c.element());
        }
    }).withSideInputs(sideInput))
    // 14,978,010 in output
    .apply(PubsubIO.writeMessages().to("projects/" + PROJECT_ID + "/topics/test_out"));

p.run();
1
Counters in streaming Dataflow are not exact. If you put a Count.globally() at the end of the main branch of your pipeline and output/log the result, do you also see the discrepancy?danielm
Yes I tried with a "Reader" job that read messages from "test_out" topic, apply "Combine.globally(Count)" and log manually result COUNT=14978010harscoet

1 Answers

2
votes

The issue is most likely due to late data dropping. You can address it by setting a windowing strategy with an infinite allowed lateness.