I have the following code running locally without a cluster:
val count = new AtomicInteger()
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)
val text: DataStream[String] = env.readTextFile("file:///flink/data2")
val mapped: DataStream[Map[String, Any]] = text.map((x: String) => Map("user" -> x.split(",")(0), "val" -> x.split(",")(1)))
val pattern: ...
CEP.pattern(mapped, pattern).select(eventMap => {
println("Found: " + (patternName, eventMap))
count.incrementAndGet()
})
env.execute()
println(count)
My data is a CSV file in the following format (user, val):
1,1
1,2
1,3
2,1
2,2
2,3
...
I am trying to detect events of the pattern where event(val=1) -> event(val=2) -> event(val=3)
. When I run this on a large input stream, with a set number of events that I know exist in the stream, I get an inconsistent count of events detected, almost always less than the number of events in the system. If I do env.setParallelism(1)
(Like I have done in line 3 of the code), all events are detected.
I assume the problem is that multiple threads are processing the events from the stream when the parallelism is > 1, which means that while one thread has event(val=1) -> event(val=2)
, event(val=3)
might be sent to a different thread and the whole pattern might not get detected.
Is there something I'm missing here? I cannot lose any patterns in the stream, but setting parallelism to 1 seems to defeat the purpose of having a system like Flink to detect events.
Update:
I have tried keying the stream using:
val mapped: KeyedStream[Map[String, Any]] = text.map(...).keyBy((m) => m.get("user"))
Though this prevents events of different users interfering with each other:
1,1
2,2
1,3
This does not prevent Flink from sending the events to the node out of order, which means that the non-determinism still exists.