0
votes

I have a dataflow setup with multiple pipelines, fetching data from pub sub topics. Since these pipelines fan out and merge with transformers and DoFunctions chains, there is a need to trace each pubsub message coming ingested throughout the pipeline.

What would be the right way to do this? Some thoughts:

  1. Side Input
  2. Each input to a ParDo function to have a context object with tracing ids etc.(A bit unintuitive)

Thanks!

1
What is the purpose of the the tracing? Are you trying to log the id of each pubsub message as it progresses through the pipeline?Lukasz Cwik
How many messages are you trying to trace (all, some random sample, some specific set of ids)?Lukasz Cwik
@LukaszCwik Yes I am trying to log the id of each pubsub message as it progresses through the pipeline. As each step there is a transformation and data is enriched from various sources along with multiple validation. And if at any step it fails for some reason I would like to keep a track so that its easy for debugging.Yauza

1 Answers

0
votes

I believe your second approach makes the most sense.

Inside your process element function you could catch any exceptions and log any failures:

import org.sfl4j.Logger;
import org.slf4j.LoggerFactory;
import ...

public class MyDoFn<ObjectWithPubsubIdA, ObjectWithPubsubIdB> {
  private static final Logger LOG = LoggerFactory.getLogger(MyDoFn.class);

  @ProcessElement
  public void processElement(ProcessContext c) {
    ObjectWithPubsubIdA a = c.element();
    try {
      ObjectWithPubsubIdB b = // transform ObjectWithPubsubIdA ...
      c.output(b);
    } catch (Exception e) {
      LOG.error("MyDoFn failed for message with id {} with exception {}", a.getId(), e);
    }
  }
}

You could use an abstract base class or some other language specific construct to re-use the code so you could share one implementation across all your transforms.