I'm still new to Flink CEP library and yet I don't understand the pattern detection behavior. Considering the example below, I have a Flink app that consumes data from a kafka topic, data is produced periodically, I want to use Flink CEP pattern to detect when a value is bigger than a given threshold. The code is below:
public class CEPJob{
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "localhost:9092");
properties.setProperty("group.id", "test");
FlinkKafkaConsumer<String> consumer = new FlinkKafkaConsumer<String>("test", new SimpleStringSchema(),
properties);
consumer.assignTimestampsAndWatermarks(WatermarkStrategy.forMonotonousTimestamps());
DataStream<String> stream = env.addSource(consumer);
// Process incoming data.
DataStream<Stock> inputEventStream = stream.map(new MapFunction<String, Stock>() {
private static final long serialVersionUID = -491668877013085114L;
@Override
public Stock map(String value) {
String[] data = value.split(":");
System.out.println("Date: " + data[0] + ", Adj Close: " + data[1]);
Stock stock = new Stock(data[0], Double.parseDouble(data[1]));
return stock;
}
});
// Create the pattern
Pattern<Stock, ?> myPattern = Pattern.<Stock>begin("first").where(new SimpleCondition<Stock>() {
private static final long serialVersionUID = -6301755149429716724L;
@Override
public boolean filter(Stock value) throws Exception {
return (value.getAdj_Close() > 140.0);
}
});
// Create a pattern stream from our warning pattern
PatternStream<Stock> myPatternStream = CEP.pattern(inputEventStream, myPattern);
// Generate alert for each matched pattern
DataStream<Stock> warnings = myPatternStream .select((Map<String, List<Stock>> pattern) -> {
Stock first = pattern.get("first").get(0);
return first;
});
warnings.print();
env.execute("CEP job");
}
}
What happens when I run the job, pattern detection doesn't happen in real-time, it outputs the warning for the detected pattern of the current record only after a second record is produced, it looks like it's delayed to print to the log the warining, I really didn't understand how to make it outputs the warning the time it detect the pattern without waiting for next record and thank you :) .
Data coming from Kafka are in string format: "date:value", it produce data every 5 secs.
Java version: 1.8, Scala version: 2.11.12, Flink version: 1.12.2, Kafka version: 2.3.0