I am working on a real-time project with Flink and I need to enrich the state of each card with prior transactions for computing transactions features as below:
For each card I have a feature that counts the number of transactions in the past 24 hours. On the other hand I have 2 data sources:
First, a database table which stores the transactions of cards until the end of yesterday.
Second, the stream of today's transactions.
So the first step is to fetch the yesterday transactions of each card from database and store them in card state. Then the second step is to update this state with today’s transactions which come on stream and compute the number of transactions in the past 24 hours for them. I tried to read the database data as a stream and connect it to the today transactions. So, to reach above goal, I used RichFlatMap function. However, because the database data was not stream inherently, the output was not correct. RichFlatMap function is in following:
transactionsHistory.connect(transactionsStream).flatMap(new
RichCoFlatMapFunction<History, Tuple2<String, Transaction>,
ExtractedFeatures>() {
private ValueState<History> history;
@Override
public void open(Configuration config) throws Exception {
this.history = getRuntimeContext().getState(new
ValueStateDescriptor<>("card history", History.class));
}
//historical data
@Override
public void flatMap1(History history,
Collector<ExtractedFeatures> collector) throws Exception {
this.history.update(history);
}
//new transactions from stream
@Override
public void flatMap2(Tuple2<String, Transaction>
transactionTuple, Collector<ExtractedFeatures> collector) throws
Exception {
History history = this.history.value();
Transaction transaction = transactionTuple.f1;
ArrayList<History> prevDayHistoryList =
history.prevDayTransactions;
// This function returns transactions which are in 24 hours
//window of the current transaction and their count.
Tuple2<ArrayList<History>, Integer> prevDayHistoryTuple =
findHistoricalDate(prevDayHistoryList,
transaction.transactionLocalDate);
prevDayHistoryList = prevDayHistoryTuple.f0;
history.prevDayTransactions = prevDayHistoryList;
this.history.update(history);
ExtractedFeatures ef = new ExtractedFeatures();
ef.updateFeatures(transaction, prevDayHistoryTuple.f1);
collector.collect(ef);
}
});
What is the right design pattern to achieve the above enriching requirement in a Flink streaming program? I found the blow question on stack overflow which is similar to my question but I couldn’t solve my problem so I decided to ask for help :)
Enriching DataStream using static DataSet in Flink streaming
Any help would be really appreciated.