1
votes

How do I send a NACK to Pub/Sub when Dataflow job is unable or unwilling to deal with the message.

Pipeline pipeline = Pipeline.create(options);

    pipeline.apply("gcs2ZipExtractor-processor",
            PubsubIO.readMessagesWithAttributes()
                    .fromSubscription(pubSubSubscription))
           .apply(ParDo.of(new ProcessZipFileEventDoFn(appProps)));
    logger.info("Started ZipFile Extractor");
    pipeline.run().waitUntilFinish();

Above is the code snippet I am using to run the ApacheBeam Dataflow pipeline job. If there is any failure happen in the ProcessZipFileEventDoFn, I want to send a NACK message to Pub/Sub subscription so that the message would be moved to DeadletterTopic. At present NACK is not happening from Dataflow Runner.

1

1 Answers

0
votes

At this current time, the Apache Beam SDK has no support for Pub/Sub's native dead-letter queue feature. However, you can write your own fairly easily. The following is from this blog post adapted to your code. The trick is to use multiple outputs from a single ParDo. One output PCollection will have the "good" data that does not throw any exceptions. The other output PCollection will contain all the "bad" data if there are any exceptions. You can then write all the elements in the dead letter PCollection to a sink, in your case a Pub/Sub topic.

PCollection input =
    pipeline.apply("gcs2ZipExtractor-processor",
                   PubsubIO.readMessagesWithAttributes()
                       .fromSubscription(pubSubSubscription))

// Put this try-catch logic in your ProcessZipFileEventDoFn, and don't forget
// the "withOutputTags"!
final TupleTag successTag ;
final TupleTag deadLetterTag;
PCollectionTuple outputTuple = input.apply(ParDo.of(new DoFn() {  
  @Override  
  void processElement(ProcessContext c) {    
  try {      
    c.output(process(c.element());    
  } catch (Exception e) {      
    c.sideOutput(deadLetterTag, c.element());  
  }
}).withOutputTags(successTag, TupleTagList.of(deadLetterTag)));

// Write the dead letter inputs to Pub/Sub for later analysis
outputTuple.get(deadLetterTag).apply(PubSubIO.write(...));

// Retrieve the successful elements...
PCollection success = outputTuple.get(successTag);