0
votes

I am trying to derive new table from a dynamic table and stream based on some fields.

Can someone guide best way to do this. I am new to flink and trying stuff.

//Dynamic Table

Table books = tEnv.sqlQuery("SELECT bookId, instrument, sum(tradedQuanity) as totalQuantity FROM tradeStreamTable group by bookId, instrument");

tEnv.registerTable("books", books);

BOOK

============================
BookId, Instrument, Quantity
Book1, Goog ,100
Book2, Vod ,10
Book1, Appl ,50
Book2, Goog ,60
Book1, Vod ,130
Book3, Appl ,110

//My Stream

tEnv.registerDataStream("allInstrumentsTable", allInstruments, "timeStampMs, instrument, instrumentValue ");

allInstrumentsTable

=========================================
"timeStampMs, instrument, instrumentValue(price)
Stream ......

=========================================

I am trying to derive new table(dynamic) whenever I get new change in book table or new instrumentValue for instrument in stream. join on instrument, instrumentValue * totalQuantity.

BOOK - With latest Price ( new Table )

======================================
BookId, Instrument, Quantity, instrumentValue * totalQuantity
Book1, Goog ,100 , 1203
Book1, Appl ,50, ...
Book1, Vod ,130, ...
Book2, Vod ,10, ...
Book2, Goog ,60, ...
Book3, Appl ,110, ...

1

1 Answers

0
votes

You cannot "update" table externally. Tables in streams are like Materialized Views in RDBMS. They are derived view in a particular moment of time on the state of stream.

What you can do though is you can derive a new table from those two

SELECT instrument, instrumentValue * totalQuantity FROM allInstrumentsTable aJOIN books b ON a.instrument = b.instrument;

As this is non-windowed join you should also take into account some retention policies to make sure the state does not grow indefinitely.