Just realized that my pipeline is wrong when it comes to erroneous events, they keep on being processed and never removed from the subscription.
Basically I have a simple pipeline which contains a trigger that would pull those events out in a file.
In one of its phases it does process the payload of the message received via PubSub and forwards it to next stages. However, there are cases where this will fail.
pipeline
.apply("Read PubSub Events",
PubsubIO.readMessagesWithAttributes().fromSubscription(options.getSubscription()))
.apply("Map to MyClass",
ParDo.of(new PubSubMessageToMyClass())) // Exception thrown in this stage.
.apply("Apply Timestamps", WithTimestamps.of(new SetTimestampFn()).withAllowedTimestampSkew(new Duration(Long.MAX_VALUE)))
...
);
When the error happens, I keep on seeing the same event over and over again in the pipeline, like it never ends processing.
Is there any way to explicitly tell Apache Beam to invalidate a given message and prevent further failed processing?