2
votes

I have the following code running locally without a cluster:

val count = new AtomicInteger()
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)
val text: DataStream[String] = env.readTextFile("file:///flink/data2")
val mapped: DataStream[Map[String, Any]] = text.map((x: String) => Map("user" -> x.split(",")(0), "val" -> x.split(",")(1)))
val pattern: ...
CEP.pattern(mapped, pattern).select(eventMap => {
  println("Found: " + (patternName, eventMap))
  count.incrementAndGet()
})

env.execute()
println(count)

My data is a CSV file in the following format (user, val):

1,1
1,2
1,3
2,1
2,2
2,3
...

I am trying to detect events of the pattern where event(val=1) -> event(val=2) -> event(val=3). When I run this on a large input stream, with a set number of events that I know exist in the stream, I get an inconsistent count of events detected, almost always less than the number of events in the system. If I do env.setParallelism(1) (Like I have done in line 3 of the code), all events are detected.

I assume the problem is that multiple threads are processing the events from the stream when the parallelism is > 1, which means that while one thread has event(val=1) -> event(val=2), event(val=3) might be sent to a different thread and the whole pattern might not get detected.

Is there something I'm missing here? I cannot lose any patterns in the stream, but setting parallelism to 1 seems to defeat the purpose of having a system like Flink to detect events.

Update:

I have tried keying the stream using:

val mapped: KeyedStream[Map[String, Any]] = text.map(...).keyBy((m) => m.get("user"))

Though this prevents events of different users interfering with each other:

1,1
2,2
1,3

This does not prevent Flink from sending the events to the node out of order, which means that the non-determinism still exists.

2

2 Answers

0
votes

Have you thought about keying your stream with the userid (your first value)? Flink guarantees that all events of one key get to the same processing node. Of course that only helps, if you want to detect a pattern of val=1->val=2->val=3 per user.

0
votes

Most probably the problem lies in applying the keyBy operator after the map operator.

So, instead of:

val mapped: KeyedStream[Map[String, Any]] = text.map(...).keyBy((m) => m.get("user"))

There should be:

val mapped: KeyedStream[Map[String, Any]] = text.keyBy((m) => m.get("user")).map(...)

I know this is an old question, but maybe it helps someone.