1
votes

I am new to flink and want to understand how to run my use case with FLINK: Application has three input data source a) historical data b) get all the live events from kafka c) get the control event that will have a trigger condition

since the application is dealing with historical data so I thought that I will merge historical data and live data and will create a table on that stream.

To trigger the event we have to write the SQL query with help of control event that is the input source and that holds the where clause.

My problem is to build the SQL query as data is in Stream and when I do something like

DataStream<ControlEvent> controlEvent
controlEvent.map(new FlatMapFunction(String, String)
{
   @override
   public String flatMap(String s, Collector<String> coll)
   {
     tableEnv.execute("select * from tableName");   /// throw serialization exception
   }
});

it throws not serialization exception Localexecutionenvironment

1

1 Answers

0
votes

That sort of dynamic query injection is not (yet) supported by Flink SQL.

Update:

Given what you've said about your requirements -- that the variations in the queries will be limited -- what you might do instead is to implement this using the DataStream API, rather than SQL. This would probably be a KeyedBroadcastProcessFunction that would hold some keyed state and you could broadcast in the updates to the query/queries.

Take a look at the Fraud Detection Demo as an example of how to build this sort of thing with Flink.