0
votes

I am new to Flink. I am replacing Kafka Streams API with Flink, because Kafka Streams is internally creating multiple internal topics which is adding overhead.

However, in the Flink job, all I am doing is

  1. Dedupe the records in given window (1hr). (Window(TumblingEventTimeWindows(3600000), EventTimeTrigger, Job$$Lambda$1097/1241473750, PassThroughWindowFunction))
    deDupedStream = deserializedStream
                        .keyBy(msg -> new StringBuilder()
                                .append("XXX").append("YYY"))
                        .timeWindow(Time.milliseconds(3600000)) // 1 hour
                        .reduce((event1, event2) -> {
                            event2.setEventTimeStamp(Math.max(event1.getEventTimeStamp(), event2.getEventTimeStamp()));
                            return event2;
                        })
                        .setParallelism(mapParallelism > 0 ? mapParallelism : defaultMapParallelism);
  1. After Deduping, I do another level of windowing and count the records before producing to kafka topic. (Window(TumblingEventTimeWindows(3600000), EventTimeTrigger, Job$$Lambda$1101/2132463744, PassThroughWindowFunction) -> Map)
SingleOutputStreamOperator<PlImaItemInterimMessage> countedStream = deDupedStream
                .filter(event -> event.getXXX() != null)
                .map(this::buildXXXObject)
                .returns(XXXObject.class)
                .setParallelism(deDupMapParallelism > 0 ? deDupMapParallelism : defaultDeDupMapParallelism)
                .keyBy(itemInterimMsg -> String.valueOf("key1") + "key2" + "key3")
                .timeWindow(Time.milliseconds(3600000))
                .reduce((existingMsg, currentMsg) -> { // Aggregate
                    currentMsg.setCount(existingMsg.getCount() + currentMsg.getCount());
                    return currentMsg;
                })
                .setParallelism(deDupMapParallelism > 0 ? deDupMapParallelism : defaultDeDupMapParallelism);

countedStream.addSink(kafkaProducerSinkFunction);

With the above setup, my assumption is the destination kafka topic will get the aggregated results every 3600000ms (1 hour). But Grafana graph shows the the result emits every near 30 mins. I do not understand why, when the window is still 1 hour range. Any suggestions?

Attached the Kafka destination topic emit range below. enter image description here

1
It looks like you are doing event time windowing -- are you processing historic, or real-time data? And what is the graph showing? Perhaps the graph is showing that you are processing historic data at a rate of something like 2 hours of historic data for every hour of processing.David Anderson
I am processing near real-time data. Event time is the time we are using. Updated the Grafana picture. The graph shows the number of messages that are emitted in that time interval. Graph has emits for every nearly 30 mins.Deepak
It's not clear how the keyBy's are working, or how you are accomplishing the deduplication by manipulating timestamps. But I believe the answer lies in there, perhaps in combination with how the timestamp assigner and watermark generator are implemented.David Anderson
This is the watermark strategy I am using ``` WatermarkStrategy .<PlImaGuidInterimMessageVO>forBoundedOutOfOrderness(Duration.ofMillis(maxOutOfOrderness)) .withTimestampAssigner((event, timestamp) -> event.getEventTimeStamp()) ```Deepak
Regarding dedupe, the logic posted is exactly what I am doing. I am keying by 4 keys and accumulating them for a given timeWindow. I am using eventTime, so I am using the watermark strategy mentioned above. Once the data is accumulated for the given key grouping, I will then reduce the key group to contain only one event per group by updating the timestamp to latest event in that key group. Let me know if you have questionsDeepak

1 Answers

0
votes

While I can't fully diagnose this without seeing more of the project, here are some points that you may have overlooked:

  • When the Flink Kafka producer is used in exactly once mode, it only commits its output when checkpointing. Consumers of your job's output, if set to read committed, will only see results when checkpoints complete. How often is your job checkpointing?

  • When the Flink Kafka producer is used in at least once mode, it can produce duplicated output. Is your job is restarting at regular intervals?

  • Flink's event time window assigners use the timestamps in the stream record metadata to determine the timing of each event. These metadata timestamps are set when you call assignTimestampsAndWatermarks. Calling setEventTimeStamp in the window reduce function has no effect on these timestamps in the metadata.

  • The stream record metadata timestamps on events emitted by a time window are set to the end time of the window, and those are the timestamps considered by the window assigner of any subsequent window.

  • keyBy(msg -> new StringBuilder().append("XXX").append("YYY")) is partitioning the stream by a constant, and will assign every record to the same partition.

  • The second keyBy (right before the second window) is replacing the first keyBy (rather than imposing further partitioning).