Currently I'm working on a semester project where I have to recognize the series of three Events. Like P -> R -> P
We have two different event types which are consumed via a Kafka connector in the same topic.
I created a parent class called Event from which the other two types derive from.
The Kafka connector deserializes the JSON with the EventSchema, to the parent class Event.
val consumer = new FlinkKafkaConsumer("events", new EventSchema, properties)
val stream = env.addSource(consumer)
The pattern looks like this:
val pattern = Pattern
.begin[Event]("before")
.subtype(classOf[Position])
.next("recognized")
.subtype(classOf[Recognized])
.next("after")
.subtype(classOf[Position])
The current problem is, that if I send three messages with the appropriate format, the pattern will not be recognized.
What I tried else.. I changed the pattern like this:
val pattern = Pattern
.begin[Event]("before")
.where(e => e.getType == "position")
.next("recognized")
.where(e => e.getType == "recognition")
.next("after")
.where(e => e.getType == "position")
This pattern works, but later I can't cast the Event class to position or recognition..
What do I miss here?
val kafkaSource = new FlinkKafkaConsumer("sp", new EventSchema, properties)
because to runtime multiple types are in one topic.. But can I combine multiple kafkaSources with different types to one? – Daniel Eisenreich