I have a Beam pipeline to read Kafka avro messages based on java SDK.The pipeline receives the message and tries to create Sliding Window,
PCollection<AvroMessage> message_timestamped =
messageValues
.apply(
"append event time for PCollection records",
WithTimestamps.of(
(AvroMessage rec) -> new Instant(rec.getTime())));
PCollection<AvroMessage> messages_Windowed =
message_timestamped
.apply(
Window
.<AvroMessage>into(
SlidingWindows
.of(Duration.standardMinutes(2))
.every(Duration.standardMinutes(1)))
.discardingFiredPanes());
Does the window get invoked after 2 Minutes or a trigger configuration is necessary.I tried to access the Window pane information as part of ParDo but it is getting triggered for each received message and it doesn't wait to accumulate the messages for configured 2 minutes. What kind of trigger is required(after 2 minutes - process only current window messages)?
Do I need to include any specific configuration to run with unbounded kafka messages?
I have used the timestamppolicy to use the message timestamp during the KafkaIO read operation,
.withTimestampPolicyFactory(
(tp, previousWaterMark) -> new CustomFieldTimePolicy(previousWaterMark))