1
votes

I have a Beam pipeline to read Kafka avro messages based on java SDK.The pipeline receives the message and tries to create Sliding Window,

PCollection<AvroMessage> message_timestamped =  
    messageValues
        .apply(
            "append event time for PCollection records", 
            WithTimestamps.of(
                (AvroMessage rec) -> new Instant(rec.getTime())));
PCollection<AvroMessage> messages_Windowed = 
    message_timestamped
        .apply(
            Window
                .<AvroMessage>into(
                    SlidingWindows
                        .of(Duration.standardMinutes(2))
                        .every(Duration.standardMinutes(1)))
                .discardingFiredPanes());

Does the window get invoked after 2 Minutes or a trigger configuration is necessary.I tried to access the Window pane information as part of ParDo but it is getting triggered for each received message and it doesn't wait to accumulate the messages for configured 2 minutes. What kind of trigger is required(after 2 minutes - process only current window messages)?

Do I need to include any specific configuration to run with unbounded kafka messages?

I have used the timestamppolicy to use the message timestamp during the KafkaIO read operation,

.withTimestampPolicyFactory(
    (tp, previousWaterMark) -> new CustomFieldTimePolicy(previousWaterMark))
1

1 Answers

1
votes

It is important to consider that windows and triggers have very different purposes:

  • Windows are based on the timestamps in the data, not on when they arrive or when they are processed. I find the best way to think about "windows" is as a secondary key. When data is unbounded/infinite, you need one of the grouping keys to have an "end" - a timestamp when you can say they are "done". Windows provide this "end". If you want to control how your data is aggregated, use windows.
  • Triggers are a way to try to control how output flows through your pipeline. They are not closely related to your business logic. If you want to manage the flow of data, use triggers.

To answer your specific questions:

  • Windows do not wait. An element that arrives may be assigned to a window that is "done" 1ms after it arrives. This is just fine.
  • Since you have not changed the default trigger, you will get one output with all of the elements for a window.
  • You also do not need discardingFiredPanes. Your configuration only produces one output per aggregation, so this has no effect.

But there is actually a problem that you will want to fix: the watermark (this controls when a window is "done") is determined by the source. Using WithTimestamps does not change the watermark. You will need to specify the timestamp in the KafkaIO transform, using withTimestampPolicyFactory. Otherwise, the watermark will move according to the publish time and may declare data late or drop data.