0
votes

Currently I'm working on a semester project where I have to recognize the series of three Events. Like P -> R -> P

We have two different event types which are consumed via a Kafka connector in the same topic.

I created a parent class called Event from which the other two types derive from.

The Kafka connector deserializes the JSON with the EventSchema, to the parent class Event.

val consumer = new FlinkKafkaConsumer("events", new EventSchema, properties)
val stream = env.addSource(consumer)

The pattern looks like this:

val pattern = Pattern
  .begin[Event]("before")
  .subtype(classOf[Position])
  .next("recognized")
  .subtype(classOf[Recognized])
  .next("after")
  .subtype(classOf[Position])

The current problem is, that if I send three messages with the appropriate format, the pattern will not be recognized.

What I tried else.. I changed the pattern like this:

val pattern = Pattern
  .begin[Event]("before")
  .where(e => e.getType == "position")
  .next("recognized")
  .where(e => e.getType == "recognition")
  .next("after")
  .where(e => e.getType == "position")

This pattern works, but later I can't cast the Event class to position or recognition..

What do I miss here?

1
Maybe the elements that you pass to the pattern are Events ?Jiayi Liao
That’s right, but is it not possible to have different types of events, order them ascending from event time and find a pattern inside? If all events come from one topic or each event got its own topic should not make a point..Daniel Eisenreich
Did you initialize the objects with the subtype when deserializing from kafka?Jiayi Liao
I just serialize it as event with val kafkaSource = new FlinkKafkaConsumer("sp", new EventSchema, properties) because to runtime multiple types are in one topic.. But can I combine multiple kafkaSources with different types to one?Daniel Eisenreich
Can you put the codes of EventSchema here? I've tried according to your descriptions, but it works.Jiayi Liao

1 Answers

1
votes

According to the comments, I think you should return the subtype instances instead of the Event. Here is my example codes for you:

val event = mapper.readValue(bytes, classOf[Event])
event.getType match {
  case "position" => mapper.readValue(bytes, classOf[Position])
  case "recognition" => mapper.readValue(bytes, classOf[Recognized])
  case _ =>
}

I successfully tried the example from a test case in CEPITCase.java.

DataStream<Event> input = env.fromElements(
  new Event(1, "foo", 4.0),
  new SubEvent(2, "foo", 4.0, 1.0),
  new SubEvent(3, "foo", 4.0, 1.0),
  new SubEvent(4, "foo", 4.0, 1.0),
  new Event(5, "middle", 5.0)
);

Pattern<Event, ?> pattern = Pattern.<Event>begin("start").subtype(SubEvent.class)
.followedByAny("middle").subtype(SubEvent.class)
.followedByAny("end").subtype(SubEvent.class);