0
votes

I am trying to perform stream-stream join using Flink v1.11 app on KDA. Join wrt to ProcessingTime works, but with EventTime I don’t see any output records from Flink.

Here is my code with EventTime processing which is not working,

public static void main(String[] args) throws Exception {
    final StreamExecutionEnvironment env =
            StreamExecutionEnvironment.getExecutionEnvironment();
    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
    DataStream<Trade> input1 = createSourceFromInputStreamName1(env)
            .assignTimestampsAndWatermarks(
                    WatermarkStrategy.<Trade>forMonotonousTimestamps()
                            .withTimestampAssigner(((event, l) -> event.getEventTime()))
            );
    DataStream<Company> input2 = createSourceFromInputStreamName2(env)
            .assignTimestampsAndWatermarks(
                    WatermarkStrategy.<Company>forMonotonousTimestamps()
                            .withTimestampAssigner(((event, l) -> event.getEventTime()))
            );
    DataStream<String> joinedStream = input1.join(input2)
            .where(new TradeKeySelector())
            .equalTo(new CompanyKeySelector())
            .window(TumblingEventTimeWindows.of(Time.seconds(30)))
            .apply(new JoinFunction<Trade, Company, String>() {
                @Override
                public String join(Trade t, Company c) {
                    return t.getEventTime() + ", " + t.getTicker() + ", " + c.getName() + ", " + t.getPrice();
                }
            });
    joinedStream.addSink(createS3SinkFromStaticConfig());
    env.execute("Flink S3 Streaming Sink Job");
}

I got a similar join working with ProcessingTime

public static void main(String[] args) throws Exception {
    final StreamExecutionEnvironment env =
            StreamExecutionEnvironment.getExecutionEnvironment();
    env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
    DataStream<Trade> input1 = createSourceFromInputStreamName1(env);
    DataStream<Company> input2 = createSourceFromInputStreamName2(env);
    DataStream<String> joinedStream = input1.join(input2)
            .where(new TradeKeySelector())
            .equalTo(new CompanyKeySelector())
            .window(TumblingProcessingTimeWindows.of(Time.milliseconds(10000)))
            .apply (new JoinFunction<Trade, Company, String> (){
                @Override
                public String join(Trade t, Company c) {
                    return t.getEventTime() + ", " + t.getTicker() + ", " + c.getName() + ", " + t.getPrice();
                }
            });
    joinedStream.addSink(createS3SinkFromStaticConfig());
    env.execute("Flink S3 Streaming Sink Job");
}

Sample records from two streams which I am trying to join:

{'eventTime': 1611773705, 'ticker': 'TBV', 'price': 71.5}
{'eventTime': 1611773705, 'ticker': 'TBV', 'name': 'The Bavaria'}
1

1 Answers

0
votes

I don't see anything obviously wrong, but any of the following could cause this job to not produce any output:

  • A problem with watermarking. For example, if one of the streams becomes idle, then the watermarks will cease to advance. Or if there are no events after a window, then the watermark will not advance far enough to close that window. Or if the timestamps aren't actually in ascending order (with the forMonotonousTimestamps strategy, the events should be in order by timestamp), the pipeline could be silently dropping all of the out-of-order events.
  • The StreamingFileSink only finalizes its output during checkpointing, and does not finalize whatever files are pending if and when the job is stopped.
  • A windowed join behaves like an inner join, and requires at least one event from each input stream in order to produce any results for a given window interval. From the example you shared, it looks like this is not the issue.

Update:

Given that what you (appear to) want to do is to join each Trade with the latest Company record available at the time of the Trade, a lookup join or a temporal table join seem like they might be good approaches.

Here are a couple of examples:

https://github.com/ververica/flink-sql-cookbook/blob/master/joins/04/04_lookup_joins.md

https://github.com/ververica/flink-sql-cookbook/blob/master/joins/03/03_kafka_join.md

Some documentation:

https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/streaming/joins.html#event-time-temporal-join

https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/streaming/versioned_tables.html