0
votes

Just realized that my pipeline is wrong when it comes to erroneous events, they keep on being processed and never removed from the subscription.

Basically I have a simple pipeline which contains a trigger that would pull those events out in a file.

In one of its phases it does process the payload of the message received via PubSub and forwards it to next stages. However, there are cases where this will fail.

        pipeline
        .apply("Read PubSub Events",
            PubsubIO.readMessagesWithAttributes().fromSubscription(options.getSubscription()))
        .apply("Map to MyClass",
            ParDo.of(new PubSubMessageToMyClass())) // Exception thrown in this stage.
        .apply("Apply Timestamps", WithTimestamps.of(new SetTimestampFn()).withAllowedTimestampSkew(new Duration(Long.MAX_VALUE)))
        ...
        );

When the error happens, I keep on seeing the same event over and over again in the pipeline, like it never ends processing.

Is there any way to explicitly tell Apache Beam to invalidate a given message and prevent further failed processing?

1

1 Answers

2
votes

Dataflow processes elements in arbitrary bundles, and will retry the complete bundle when an error is thrown for any element in that bundle. When running in batch mode, bundles including a failing item are retried 4 times. The pipeline will fail completely when a single bundle has failed 4 times. When running in streaming mode, a bundle including a failing item will be retried indefinitely, which may cause your pipeline to permanently stall.

Consider guarding against errors in your code by adding exception handlers. For example, if you'd like to drop elements that fail some custom input validation done in a ParDo, use a try/catch block within your ParDo to handle the exception and drop the element.