1
votes

I'm programming a simple example for testing the new Scala API for CEP in Flink, using the latest Github version for 1.1-SNAPSHOT.

The Pattern is only a check for a value, and outputs a single String as a result for each pattern matched. Code is as follows:

val pattern : Pattern[(String, Long, Int), _] = Pattern.begin("start").where(_._3 < 4)

val cepEventAlert = CEP.pattern(streamingAlert, pattern)

def selectFn(pattern : mutable.Map[String, (String, Long, Int)]): String = {
    val startEvent = pattern.get("start").get
    "Alerta:"+startEvent._1+": Pattern"
}

val patternStreamSelected = cepEventAlert.select(selectFn(_))

patternStreamSelected.print()

It compiles and runs under 1.1-SNAPSHOT without issue, but the jobmanager output shows no sign of that print(). Even relaxing the pattern conditions, and setting only a "start" (Accepting all events) returns absolutely nothing.

Also, when trying to add stages, the code fails to compile. If I change the Pattern to (Finding two consecutive events with 3rd field less than 4):

Pattern.begin("start").where(_._3 < 4).next("end").where(_._3 < 4).within(Time.seconds(30))

The compiler then throws:

error: missing parameter type for expanded function ((x$4) => x$4._3.$less(4))

Showing the error is on the first where() after the "start" stage. I tryed to explicitly set the parameter type with:

(x: (String, Long, Int)) => x._3 < 4

That way it compiles again, but when it runs on Flink, then no output is shown. StreamingAlert is a Scala DataStream[(String, Long, Int)], and in other parts of code, I can filter with _._ < 4 without problems and the output seems correct.

1

1 Answers

1
votes

The print() API call in the streaming API does not trigger eager execution. You still have to call env.execute() at the end of your program.

When you define your pattern you should provide the event type somewhere. Either you do it as you've done it or you do it via a type parameter for begin:

Pattern.begin[(String, Long, Int)]("start").where(_._3 < 4).next("end").where(_._3 < 4).within(Time.seconds(30))