First of all I already have found this question here: flink program behaves differently in parallelism, and it looks the same issue that I'm facing right now, but I think I do need CEP in my scenario, because I have more than 1.000.000 records that belongs to a different user key per hour to analize.
So when I run cep with parallelism 1 everything works fine, even for different user keys, but a bit slow because flink needs to analyze user by user in a single Thread, and this operation needs to be faster enough to recognize some pattern and then send a notification in less than 1 minute for example, that's why I need more than 1 parallel thread.
In my case I'm working with RichFlatMapFunction to keep the previous pattern to recognize the next and then send the notification, here is my code:
final DataStream<EventPush> eventsStream = RabbitMQConnector.eventStreamObject(env)
.flatMap(new RabbitMQPushConsumer())
.keyBy(k -> k.id);
private static SingleOutputStreamOperator<String> getPushToSend(KeyedStream<EventPush, String> stream) {
return stream.flatMap(new WebPushFlatMapFunction())
.map(x -> new ObjectMapper().writeValueAsString(x));
}
/*the code below belongs to WebPushFlatMapFunction class, which is the RichFlatMapFunction using ValueState*/
private boolean inTime(long start, long end) {
final long difference = (end > start) ? (end - start) : (start - end);
long time_frame = 120000L;
return difference > 0 && time_frame >= difference;
}
@Override
public void flatMap(EventPush value, Collector<EventPush> out) {
final String pageName= value.pageName.trim();
Tuple4<String, String, Long, Timestamp> prev;
try {
prev = previous.value();
if (b_pageName.equalsIgnoreCase(pattern)) {
LOG.info("umid " + value.idsUmid + " match (" + pattern + ") at: " + value.timestamp);
previous.update(new Tuple4<>(value.idsUmid, pageName, value.timestamp.getTime(), value.timestamp));
}
if (prev != null) {
if (inTime(value.timestamp.getTime(), prev.f2)) {
if ((prev.f1 != null && !prev.f1.equals("")) && prev.f1.equals(full_pattern) && pageName.equals(home) && prev.f3.before(value.timestamp)) {
if (PropertyFileReader.isWebPushLoggerActivated())
LOG.info("umid " + value.idsUmid + " match (" + home + ")" + "triggered at: " + value.timestamp);
prev.f1 = "";
out.collect(value);
}
if ((prev.f1 != null && !prev.f1.equals("")) && prev.f1.equals(pattern) && pageName.equals(full_pattern) && prev.f3.before(value.timestamp)) {
LOG.info("umid " + value.idsUmid + " match (" + full_pattern + ") at: " + value.timestamp);
prev.f3 = value.timestamp;
prev.f1 = pageName;
previous.update(prev);
}
}
}
} catch (IOException e) {
CatchHandler.generalCatchHandler(e);
}
}
And with parallelism 1 I get the proper order: 1,2,3. With more than that I can receive 1 in one thread and 3 from another, as all the belongs to the same user key and these 3 states are going to be partitioned in different threads. My question: Is there anyhow to do this with more parallelism? Kind regards.