I am using Flink CEP to detect patterns against events from Kafka. For simplicity, events only have one type. I am trying to detect the change in the value of a field in the continuous event stream. The code looks like the following
val streamEnv = StreamExecutionEnvironment.getExecutionEnvironment
streamEnv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
streamEnv.addSource(new FlinkKafkaConsumer[..]())
.filter(...)
.map(...)
.assignTimestampsAndWatermarks(
WatermarkStrategy.forMonotonousTimestamps[Event]().withTimestampAssigner(..)
)
.keyBy(...)(TypeInformation.of(classOf[...]))
val pattern: Pattern[Event, _] =
Pattern.begin[Event]("start", AfterMatchSkipStrategy.skipPastLastEvent()).times(1)
.next("middle")
.oneOrMore()
.optional()
.where(new IterativeCondition[Event] {
override def filter(event: Event, ctx:...): Boolean = {
val startTrafficEvent = ctx.getEventsForPattern("start").iterator().next()
startTrafficEvent.getFieldValue().equals(event.getFieldValue())
}
})
.next("end").times(1)
.where(new IterativeCondition[Event] {
override def filter(event: Event, ctx:...): Boolean = {
val startTrafficEvent = ctx.getEventsForPattern("start").iterator().next()
!startTrafficEvent.getFieldValue().equals(event.getFieldValue())
}
})
.within(Time.seconds(30))
Kafka topic has 104 partitions, events are distributed evenly across the partitions. When I submitted the job, parallelism
was set to 104.
From Web UI, there were 2 tasks: the first one is Source->filter->map->timestamp/watermark
; the second one is CepOperator->sink
. Each task got 104 parallelism.
The workload on subtasks was uneven, it should come from keyBy
. Watermarks among subtasks were different, but they started to be stuck at a value, no change for a long time. From logs, I can see CEP kept evaluating events, and matched results being pushed to downstream sink.
The event rate was 10k/s, and the first task's backpressure kept high
and the second one ok
.
Please help explain what happened in CEP and how to fix the issue
Thanks