0
votes

I have 2 streams created using kafka topics and I'm joining them using the DataStream API. I want the results of the join (apply) to be published to another kafka topic. I don't see the results of the join in the out topic.

I confirm I'm publishing proper data to both the source topics. Not sure where it is going wrong. Here is code snippet,

The streams created as shown below.

DataStream<String> ms1=env.addSource(new FlinkKafkaConsumer("top1",new SimpleStringSchema(),prop))
            .assignTimestampsAndWatermarks(new WatermarkStrategy() {
                @Override
                public WatermarkGenerator createWatermarkGenerator(WatermarkGeneratorSupplier.Context context) {
                    return new AscendingTimestampsWatermarks<>();
                }
                @Override
                public TimestampAssigner createTimestampAssigner(TimestampAssignerSupplier.Context context) {
                    return (event, timestamp) -> System.currentTimeMillis();
                }
            });
DataStream<String> ms2=env.addSource(new FlinkKafkaConsumer("top2",new SimpleStringSchema(),prop))
            .assignTimestampsAndWatermarks(new WatermarkStrategy() {
                @Override
                public WatermarkGenerator createWatermarkGenerator(WatermarkGeneratorSupplier.Context context) {
                    return new AscendingTimestampsWatermarks<>();
                }
                @Override
                public TimestampAssigner createTimestampAssigner(TimestampAssignerSupplier.Context context) {
                    return (event, timestamp) -> System.currentTimeMillis();
                }
            });

Stream joins performed using the join-where-equals, as below.

DataStream joinedStreams = ms1.join(ms2)
            .where(o -> {String[] tokens = ((String) o).split("::"); return tokens[0];})
            .equalTo(o -> {String[] tokens = ((String) o).split("::"); return tokens[0];})
            .window(EventTimeSessionWindows.withGap(Time.seconds(60)))
            .apply(new JoinFunction<String, String, CountryData>() {
                @Override
                public CountryData join(String o, String o2) throws Exception {
                    String[] tokens1 = o.split("::");
                    String[] tokens2 = o2.split("::");
                    CountryData countryData = new CountryData(tokens1[0], tokens1[1], tokens1[2], Long.parseLong(tokens1[3])+Long.parseLong(tokens2[3]));
                    return countryData;
                }});

Added sink as below,

joinedStreams.addSink(new FlinkKafkaProducer<CountryData>("localhost:9095","flink-output", new CustomSchema()));
dataStreamSink.setParallelism(1);
dataStreamSink.name("KAFKA-TOPIC");

Any clue, where it is going wrong? I can see messages available in the topology Thanks

1

1 Answers

1
votes

I think the two FlinkKafkaConsumer instances are missing a time extractor and a watermark configuration.

Since the code is using event-time window join, it needs some kind of time information associated with the data found in Kafka in order to know which time window each events corresponds to.

Without that, events from both streams are probably never close enough in event time to match the 60s window defined by EventTimeSessionWindows.withGap(Time.seconds(60)).

You also need to set the watermark parameter to tell Flink when to stop waiting for new data and materialize the output s.t. you can see the join result.

Have a look at the Kafka connector time and watermark configuration for the various time extraction and watermarking possibilities you have.

Finally, make sure you send test data spread over a long enough time period to your application. With event time processing, only "old enough" data makes it to the output, young data is always "stuck in transit". For example, with 60s time window and, say, 30s watermark, you would need at least 90s of data before you see anything in the output.