I am trying to derive new table from a dynamic table and stream based on some fields.
Can someone guide best way to do this. I am new to flink and trying stuff.
//Dynamic Table
Table books = tEnv.sqlQuery("SELECT bookId, instrument, sum(tradedQuanity) as totalQuantity FROM tradeStreamTable group by bookId, instrument");
tEnv.registerTable("books", books);
BOOK
============================
BookId, Instrument, Quantity
Book1, Goog ,100
Book2, Vod ,10
Book1, Appl ,50
Book2, Goog ,60
Book1, Vod ,130
Book3, Appl ,110
//My Stream
tEnv.registerDataStream("allInstrumentsTable", allInstruments, "timeStampMs, instrument, instrumentValue ");
allInstrumentsTable
=========================================
"timeStampMs, instrument, instrumentValue(price)
Stream ......
=========================================
I am trying to derive new table(dynamic) whenever I get new change in book table or new instrumentValue for instrument in stream. join on instrument, instrumentValue * totalQuantity.
BOOK - With latest Price ( new Table )
======================================
BookId, Instrument, Quantity, instrumentValue * totalQuantity
Book1, Goog ,100 , 1203
Book1, Appl ,50, ...
Book1, Vod ,130, ...
Book2, Vod ,10, ...
Book2, Goog ,60, ...
Book3, Appl ,110, ...