2
votes

We're currently big users of Dataflow batch jobs and wanting to start using Dataflow streaming if it can be done reliably.

Here is a common scenario: We have a very large Kafka topic that we need to do some basic ETL or aggregation on and a non idempotent upstream queue. Here is an example of our Kafka data:


    ID |     msg | timestamp (mm,ss)
-----------------------
  1    |     A   |  01:00
  2    |     B   |  01:01
  3    |     D   |  06:00
  4    |     E   |  06:01
  4.3  |     F   |  06:01
 ....  | ......  | ...... (millions more)
  
  4.5  |    ZZ   |  19:58




Oops, the data changes from integers to decimals at some point, which will eventually cause some elements to fail, necessitating us to kill the pipeline, possibly modify the downstream service, and possibly make minor code changes to the Dataflow pipeline.

In Spark Structured Streaming, because of the ability to use external checkpoints, we would be able to restart a streaming job and resume processing the queue where the previous job left off (successfully processing) for exactly once processing. In a vanilla or spring boot Java Application we could loop through with a Kafka consumer, and only after writing results to our 'sink', commit offsets.

My overall question is can we achieve similar functionality in Dataflow? I'll list some of my assumptions and concerns:

  1. It seems here
 in KafkaIO 

there is not a relationship between the offset commit PCollection and the User's one, does that mean they can drift apart?
  2. It seems here in KafkaOffsetCommit 
this is taking a window of five minutes and emitting the highest offset, but this is not wall time, this is kafka record time. Going back to our sample data, to me it looks like the entire queue's offset would be committed (in chunks of five minutes) as fast as possible! This means that if we have only finished processing up to record F in the first five minutes, we may have committed almost the entire queue's offests?

Now in our scenario our Pipeline started failing around F, it seems our only choice is to start from the beginning or lose data? 

I believe this might be overcome with a lot of custom code (Custom DoFn to ensure the Kafka Consumer never commits) and some custom code for our upstream sink that would eventually commit offsets. Is there a better way to do this, and/or are some my assumptions wrong about how offset management is handled in Dataflow? 


1

1 Answers

2
votes

Thank you for the detailed question!

  1. In Beam (hence Dataflow), all of the outputs for a "bundle" are committed together, along with all state updates, checkpoints, etc, so there is no drift between different output PCollections. In this specific case, the offsets are extracted directly from the elements to be output so they correspond precisely. The outputs and offsets are both durably committed to Dataflow's internal storage before the offset is committed back to Kafka.
  2. You are correct that the offsets from the elements already processed are grouped into 5 minute event time windows (Kafka record time) and the maximum offset is taken. While 5 minutes is an arbitrary duration, the offsets correspond to elements that have been successfully pulled off the queue.