Here is my requirements, there are 3 Kafka topics: topic1-device & topic2-consumer & topic3-order. we are going to calculate the order(amount) per device from the past 12hours using flink SQL. I did the following things:
- Regiter 3 tables corresponding 3 kafka topics.
// java code
String creConsumer = "CREATE TABLE " + consumerTable + " (" +
" deviceId STRING" +
",deviceFingerprintHash STRING" +
",consumer ROW(consumerUuid STRING)" +
",eventInfo ROW<eventTime BIGINT>" +
",id BIGINT" +
",ts AS TO_TIMESTAMP(FROM_UNIXTIME(eventInfo.eventTime/1000, 'yyyy-MM-dd HH:mm:ss'))" +
",WATERMARK FOR ts AS ts - INTERVAL '5' SECOND" +
") WITH (...)";
String createToken = "CREATE TABLE " + orderTokenTable + " (" +
"sessionId BIGINT" +
",token STRING" +
",eventInfo ROW(eventTime BIGINT)" +
",ts AS TO_TIMESTAMP(FROM_UNIXTIME(eventInfo.eventTime/1000, 'yyyy-MM-dd HH:mm:ss'))" +
",WATERMARK FOR ts AS ts - INTERVAL '5' SECOND" +
") WITH (...)";
String createTransaction = "CREATE TABLE " + orderTransactionTable + " (" +
"orderTransactionId BIGINT" +
",consumer ROW(`consumerUuid` STRING)" +
",token STRING" +
",countryCode STRING" +
",consumerTotalAmount ROW<amount STRING>" +
",status STRING" +
",eventInfo ROW<eventTime BIGINT>" +
",ts AS TO_TIMESTAMP(FROM_UNIXTIME(eventInfo.eventTime/1000, 'yyyy-MM-dd HH:mm:ss'))" +
",WATERMARK FOR ts AS withOffset(ts,1000)" +
") WITH (...)";
- Join the 3 tables and generate a View:
// java code
String createWideTable = "CREATE VIEW view_order_consumer AS " +
"SELECT " +
" otc.eventTime " +
",otc.orderTransactionId " +
",otc.token" +
",otc.consumerUuid " +
",otc.countryCode " +
",CAST(otc.amount AS DOUBLE) AS amount " +
",otc.status " +
",csc.deviceId " +
",csc.deviceFingerprintHash " +
",otc.ts " +
"FROM " +
" order_transaction_completed otc " +
"INNER JOIN order_token_added ota " +
" ON (otc.token=ota.token AND otc.ts BETWEEN ota.ts - INTERVAL '10' DAY AND ota.ts + INTERVAL '10' DAY)" +
"INNER JOIN consumer_session_created csc " +
" ON (ota.sessionId=csc.id AND csc.ts BETWEEN otc.ts - INTERVAL '10' DAY AND otc.ts) ";
- Do the aggregation job using window agg(Hop Window):
// SQL
select deviceId
,HOP_START(voc.ts, INTERVAL '5' SECOND , INTERVAL '10' SECOND)
,count(1) as cnt
from consumer_session_created as voc
group by HOP(voc.ts, INTERVAL '5' SECOND , INTERVAL '10' SECOND)
,deviceId
- Output result table data:
// java
DataStream<Tuple2<Boolean, Row>> retractResultStream = tableEnvironment
.toRetractStream(table, Row.class);
retractResultStream.print();
However I can't get any result(No error messages). Change the sql to :
// SQL
select * from view_order_consumer
Result:
6> (true,1628564685939,100100113280,002.qa2dtem5k6umlokop1boud4c8p77c9lhclb8i5ug0na383ed,94e44b95-223b-4479-82b9-b4f710f7f8c3,US,10.0,APPROVED,740baadd20e544e8bdcdc8d2a76cbdc9,c718225f5f1d4876ffc1ce2bb5ab3852,2021-08-10T11:04:45)
6> (true,1628564687358,100100113280,002.qa2dtem5k6umlokop1boud4c8p77c9lhclb8i5ug0na383ed,94e44b95-223b-4479-82b9-b4f710f7f8c3,US,11.0,APPROVED,740baadd20e544e8bdcdc8d2a76cbdc9,c718225f5f1d4876ffc1ce2bb5ab3852,2021-08-10T11:04:47)
6> (true,1628564688364,100100113280,002.qa2dtem5k6umlokop1boud4c8p77c9lhclb8i5ug0na383ed,94e44b95-223b-4479-82b9-b4f710f7f8c3,US,12.0,APPROVED,740baadd20e544e8bdcdc8d2a76cbdc9,c718225f5f1d4876ffc1ce2bb5ab3852,2021-08-10T11:04:48)
After some research, I found that the time-attr will be dropped after join operator in Flink.
Can anyone tell me how can I get the correct data using FLink-sql ?