2
votes

I’m having a bit of trouble testing the new Flink 1.0.0 functionalities. I’ve been tinkering around with CEP and I didn’t yet managed to run a simple demo code:

val pattern : Pattern[TrafficEvent, _] = Pattern.begin[TrafficEvent]("start")
val patternStream = CEP.pattern(stream.javaStream, pattern);

class MyPatternSelectFunction extends PatternSelectFunction[TrafficEvent, TrafficEvent] {
    override def select(pattern : java.util.Map[String, TrafficEvent]) : TrafficEvent ={
        pattern.get("start")
    }
}
val alerts = patternStream.select(new MyPatternSelectFunction())

The code compiles well, and maven show no warnings. TrafficEvent is a class with few simple fields, and stream is a Scala DataStream of that class. The error shows up when the code is running on Flink. It runs for a second, and then the code exits with this error message:

The program finished with the following exception:

  Input mismatch: Tuple type expected.
            org.apache.flink.api.java.typeutils.TypeExtractor.validateInputType(TypeExtractor.java:878)
            org.apache.flink.api.java.typeutils.TypeExtractor.getUnaryOperatorReturnType(TypeExtractor.java:302)
            org.apache.flink.cep.PatternStream.select(PatternStream.java:64)
            com.demo.DemoTraffic$.main(DemoTraffic.scala:311)

I’ve tried to move the functionality to Java, by building a static class like this (Maybe there are some weird issues calling the API from Scala):

public static DataStream<DemoTraffic.trafficEvent>  getStreamByPattern(DataStream<DemoTraffic.trafficEvent> stream) {
  Pattern<DemoTraffic.trafficEvent, ?> pattern = Pattern.<DemoTraffic.trafficEvent>begin("start");
  PatternStream<DemoTraffic.trafficEvent> patternStream = CEP.pattern(stream, pattern);
  DataStream<DemoTraffic.trafficEvent> rvalue = patternStream.select(new PatternSelectFunction<DemoTraffic.trafficEvent, DemoTraffic.trafficEvent>() {
    @Override
    public DemoTraffic.trafficEvent select(Map<String, DemoTraffic.trafficEvent> pattern) throws Exception {
      return pattern.get("start");
    }
  });
  return rvalue;
}

But the result is exactly the same, and it throws the same error in the PatternStream.select line. Any hints about what can I try or what I’m doing wrong? As you can see, the pattern is pretty stupid, and it's only for testing purpouses. It only accepts all events, and gives back that event in response. Flink is 1.0.0, using Scala 2.10 version.

Thanks

1
What's the definition of TrafficEvent?Till Rohrmann
Could you try your example with the latest SNAPSHOT? Maybe it has something to do with the recently fixed issues.apache.org/jira/browse/FLINK-3563.twalthr

1 Answers

1
votes

I assume that TrafficEvent is a Scala case class. The CEP library has been written for Flink's Java API and, thus, does not support Scala case classes, yet.

As a workaround, you could translate your case class into a normal Scala class.

There is also a JIRA ticket which tracks the development of the CEP Scala API.