0
votes

Currently I'm working on a semester project where I have to recognize the series of three Events. Like P -> R -> P

We have two different event types which are consumed via a Kafka connector in the same topic.

I created a parent class called Event from which the other two types derive from.

The Kafka connector deserializes the JSON with the EventSchema, to the parent class Event.

val consumer = new FlinkKafkaConsumer("events", new EventSchema, properties)
val stream = env.addSource(consumer)

The pattern looks like this:

val pattern = Pattern
  .begin[Event]("before")
  .subtype(classOf[Position])
  .next("recognized")
  .subtype(classOf[Recognized])
  .next("after")
  .subtype(classOf[Position])

The current problem is, that if I send three messages with the appropriate format, the pattern will not be recognized.

What I tried else.. I changed the pattern like this:

val pattern = Pattern
  .begin[Event]("before")
  .where(e => e.getType == "position")
  .next("recognized")
  .where(e => e.getType == "recognition")
  .next("after")
  .where(e => e.getType == "position")

This pattern works, but later I can't cast the Event class to position or recognition..

What do I miss here?

1
Maybe the elements that you pass to the pattern are Events ? - Jiayi Liao
That’s right, but is it not possible to have different types of events, order them ascending from event time and find a pattern inside? If all events come from one topic or each event got its own topic should not make a point.. - Daniel Eisenreich
Did you initialize the objects with the subtype when deserializing from kafka? - Jiayi Liao
I just serialize it as event with val kafkaSource = new FlinkKafkaConsumer("sp", new EventSchema, properties) because to runtime multiple types are in one topic.. But can I combine multiple kafkaSources with different types to one? - Daniel Eisenreich
Can you put the codes of EventSchema here? I've tried according to your descriptions, but it works. - Jiayi Liao

1 Answers

1
votes

According to the comments, I think you should return the subtype instances instead of the Event. Here is my example codes for you:

val event = mapper.readValue(bytes, classOf[Event])
event.getType match {
  case "position" => mapper.readValue(bytes, classOf[Position])
  case "recognition" => mapper.readValue(bytes, classOf[Recognized])
  case _ =>
}

I successfully tried the example from a test case in CEPITCase.java.

DataStream<Event> input = env.fromElements(
  new Event(1, "foo", 4.0),
  new SubEvent(2, "foo", 4.0, 1.0),
  new SubEvent(3, "foo", 4.0, 1.0),
  new SubEvent(4, "foo", 4.0, 1.0),
  new Event(5, "middle", 5.0)
);

Pattern<Event, ?> pattern = Pattern.<Event>begin("start").subtype(SubEvent.class)
.followedByAny("middle").subtype(SubEvent.class)
.followedByAny("end").subtype(SubEvent.class);