1
votes

I would like to know if FLINK can support my requirement, I have gone through with lot of articles but not sure if my case can be solved or not

Case: I have two input source. a)Event b)ControlSet Event sample data is:

event 1-
{
   "id" :100
   "data" : {
             "name" : "abc"
            }
}

event 2-
{
   "id" :500
   "data" : {
             "date" : "2020-07-10";
             "name" : "event2"
            }
}

if you see event-1 and event-2 both have different attribute in "data". so consider like data is free form field and name of the attribute can be same/different.

ControlSet will give us instruction to execute the trigger. for example trigger condition could be like

(id = 100 && name = abc) OR (id =500 && date ="2020-07-10")

please help me if these kind of scenario possible to run in flink and what could be the best way. I dont think patternCEP or SQL can help here and not sure if event dataStream can be as JSON object and can be query like JSON path on this.

1
Are these trigger conditions supplied at compile time, or while the application is running?David Anderson
Is there any fixed pattern to how the ids are used in the queries? Do you need to scale this out so that it runs in parallel?David Anderson
Thanks David for quick reply, yes these conditions will come at run time and if parallelism cab be supported then great!! But challenge is unpredictable events data attribute in "data" (it is like Map<String, String> data) .Ashutosh

1 Answers

1
votes

Yes, this can be done with Flink. And CEP and SQL don't help, since they require that the pattern is known at compile time.

For the event stream, I propose to key this stream by the id, and to store the attribute/value data in keyed MapState, which is a kind of keyed state that Flink knows how to manage, checkpoint, restore, and rescale as necessary. This gives us a distributed map, mapping ids to hash maps holding the data for each id.

For the control stream, let me first describe a solution for a simplified version where the control queries are of the form

(id == key) && (attr == value)

We can simply key this stream by the id in the query (i.e., key), and connect this stream to the event stream. We'll use a RichCoProcessFunction to hold the MapState described above, and as these queries arrive, we can look to see what data we have for key, and check if map[attr] == value.

To handle more complex queries, like the one in the question

(id1 == key1 && attr1 == value1) OR (id2 == key2 && attr2 == value2)

we can do something more complex.

Here we will need to assign a unique id to each control query.

One approach would be to broadcast these queries to a KeyedBroadcastProcessFunction that once again is holding the MapState described above. In the processBroadcastElement method, each instance can use applyToKeyedState to check on the validity of the components of the query for which that instance is storing the keyed state (the attr/value pairs derived from the data field in the even stream). For each keyed component of the query where an instance can supply the requested info, it emits a result downstream.

Then after the KeyedBroadcastProcessFunction we key the stream by the control query id, and use a KeyedProcessFunction to assemble together all of the responses from the various instances of the KeyedBroadcastProcessFunction, and to determine the final result of the control/query message.

It's not really necessary to use broadcast here, but I found this scheme a little more straightforward to explain. But you could instead route keyed copies of the query to only the instances of the RichCoProcessFunction holding MapState for the keys used in the control query, and then do the same sort of assembly of the final result afterwards.

That may have been hard to follow. What I've proposed involves composing two techniques I've coded up before in examples: https://github.com/alpinegizmo/flink-training-exercises/blob/master/src/main/java/com/ververica/flinktraining/solutions/datastream_java/broadcast/TaxiQuerySolution.java is an example that uses broadcast to trigger the evaluation of query predicates across keyed state, and https://gist.github.com/alpinegizmo/5d5f24397a6db7d8fabc1b12a15eeca6 is an example that uses a unique id to re-assemble a single response after doing multiple enrichments in parallel.