The input stream consists of data in JSON array of objects format. Each object has one field/key named state by which we need to separate the input stream, see below example
Object1 -> "State":"Active"
Object2 -> "State":"Idle"
Object3 -> "State":"Blocked"
Object4 -> "State":"Active"
We have to start processing/thread as soon as we receive a particular state, keep on getting the data if a new state is similar to the previous state let the previous thread handle it else start a new thread for a new state. Also, it is required to run each thread for finite time and all the threads should run in parallel.
Please suggest how can I do it in Apache Flink. Pseudo codes and links would be helpful.