I am trying to perform stream-stream join using Flink v1.11 app on KDA. Join wrt to ProcessingTime
works, but with EventTime
I don’t see any output records from Flink.
Here is my code with EventTime processing which is not working,
public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
DataStream<Trade> input1 = createSourceFromInputStreamName1(env)
.assignTimestampsAndWatermarks(
WatermarkStrategy.<Trade>forMonotonousTimestamps()
.withTimestampAssigner(((event, l) -> event.getEventTime()))
);
DataStream<Company> input2 = createSourceFromInputStreamName2(env)
.assignTimestampsAndWatermarks(
WatermarkStrategy.<Company>forMonotonousTimestamps()
.withTimestampAssigner(((event, l) -> event.getEventTime()))
);
DataStream<String> joinedStream = input1.join(input2)
.where(new TradeKeySelector())
.equalTo(new CompanyKeySelector())
.window(TumblingEventTimeWindows.of(Time.seconds(30)))
.apply(new JoinFunction<Trade, Company, String>() {
@Override
public String join(Trade t, Company c) {
return t.getEventTime() + ", " + t.getTicker() + ", " + c.getName() + ", " + t.getPrice();
}
});
joinedStream.addSink(createS3SinkFromStaticConfig());
env.execute("Flink S3 Streaming Sink Job");
}
I got a similar join working with ProcessingTime
public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
DataStream<Trade> input1 = createSourceFromInputStreamName1(env);
DataStream<Company> input2 = createSourceFromInputStreamName2(env);
DataStream<String> joinedStream = input1.join(input2)
.where(new TradeKeySelector())
.equalTo(new CompanyKeySelector())
.window(TumblingProcessingTimeWindows.of(Time.milliseconds(10000)))
.apply (new JoinFunction<Trade, Company, String> (){
@Override
public String join(Trade t, Company c) {
return t.getEventTime() + ", " + t.getTicker() + ", " + c.getName() + ", " + t.getPrice();
}
});
joinedStream.addSink(createS3SinkFromStaticConfig());
env.execute("Flink S3 Streaming Sink Job");
}
Sample records from two streams which I am trying to join:
{'eventTime': 1611773705, 'ticker': 'TBV', 'price': 71.5}
{'eventTime': 1611773705, 'ticker': 'TBV', 'name': 'The Bavaria'}