0
votes

Here is my requirements, there are 3 Kafka topics: topic1-device & topic2-consumer & topic3-order. we are going to calculate the order(amount) per device from the past 12hours using flink SQL. I did the following things:

  1. Regiter 3 tables corresponding 3 kafka topics.
// java code 
String creConsumer = "CREATE TABLE " + consumerTable + " (" +
                    " deviceId STRING" +
                    ",deviceFingerprintHash STRING" +
                    ",consumer ROW(consumerUuid STRING)" +
                    ",eventInfo ROW<eventTime BIGINT>" +
                    ",id BIGINT" +
                    ",ts AS TO_TIMESTAMP(FROM_UNIXTIME(eventInfo.eventTime/1000, 'yyyy-MM-dd HH:mm:ss'))" +
                    ",WATERMARK FOR ts AS ts - INTERVAL '5' SECOND" +
                ") WITH (...)";

String createToken = "CREATE TABLE " + orderTokenTable + " (" +
                    "sessionId BIGINT" +
                    ",token STRING" +
                    ",eventInfo ROW(eventTime BIGINT)" +
                    ",ts AS TO_TIMESTAMP(FROM_UNIXTIME(eventInfo.eventTime/1000, 'yyyy-MM-dd HH:mm:ss'))" +
                    ",WATERMARK FOR ts AS ts - INTERVAL '5' SECOND" +
                ") WITH (...)";

String createTransaction = "CREATE TABLE " + orderTransactionTable + " (" +
                    "orderTransactionId BIGINT" +
                    ",consumer ROW(`consumerUuid` STRING)" +
                    ",token STRING" +
                    ",countryCode STRING" +
                    ",consumerTotalAmount ROW<amount STRING>" +
                    ",status STRING" +
                    ",eventInfo ROW<eventTime BIGINT>" +
                    ",ts AS TO_TIMESTAMP(FROM_UNIXTIME(eventInfo.eventTime/1000, 'yyyy-MM-dd HH:mm:ss'))" +
                    ",WATERMARK FOR ts AS withOffset(ts,1000)" +
                ") WITH (...)";
  1. Join the 3 tables and generate a View:
 // java code
 String createWideTable = "CREATE VIEW view_order_consumer AS " +
                "SELECT " +
                    " otc.eventTime " +
                    ",otc.orderTransactionId " +
                    ",otc.token" +
                    ",otc.consumerUuid " +
                    ",otc.countryCode " +
                    ",CAST(otc.amount AS DOUBLE) AS amount " +
                    ",otc.status " +
                    ",csc.deviceId " +
                    ",csc.deviceFingerprintHash " +
                    ",otc.ts " +
                "FROM " +
                    "   order_transaction_completed otc " +
                "INNER JOIN order_token_added ota " +
                    "   ON (otc.token=ota.token AND otc.ts BETWEEN ota.ts - INTERVAL '10' DAY AND ota.ts + INTERVAL '10' DAY)" +
                "INNER JOIN consumer_session_created csc " +
                    "   ON (ota.sessionId=csc.id AND csc.ts BETWEEN otc.ts - INTERVAL '10' DAY AND otc.ts) ";
  1. Do the aggregation job using window agg(Hop Window):
// SQL
select deviceId
     ,HOP_START(voc.ts, INTERVAL '5' SECOND , INTERVAL '10' SECOND)
     ,count(1) as cnt
from consumer_session_created as voc
group by HOP(voc.ts, INTERVAL '5' SECOND , INTERVAL '10' SECOND)
       ,deviceId
  1. Output result table data:
// java
DataStream<Tuple2<Boolean, Row>> retractResultStream = tableEnvironment
                        .toRetractStream(table, Row.class);
                retractResultStream.print();

However I can't get any result(No error messages). Change the sql to :

// SQL
select * from view_order_consumer

Result:

6> (true,1628564685939,100100113280,002.qa2dtem5k6umlokop1boud4c8p77c9lhclb8i5ug0na383ed,94e44b95-223b-4479-82b9-b4f710f7f8c3,US,10.0,APPROVED,740baadd20e544e8bdcdc8d2a76cbdc9,c718225f5f1d4876ffc1ce2bb5ab3852,2021-08-10T11:04:45)
6> (true,1628564687358,100100113280,002.qa2dtem5k6umlokop1boud4c8p77c9lhclb8i5ug0na383ed,94e44b95-223b-4479-82b9-b4f710f7f8c3,US,11.0,APPROVED,740baadd20e544e8bdcdc8d2a76cbdc9,c718225f5f1d4876ffc1ce2bb5ab3852,2021-08-10T11:04:47)
6> (true,1628564688364,100100113280,002.qa2dtem5k6umlokop1boud4c8p77c9lhclb8i5ug0na383ed,94e44b95-223b-4479-82b9-b4f710f7f8c3,US,12.0,APPROVED,740baadd20e544e8bdcdc8d2a76cbdc9,c718225f5f1d4876ffc1ce2bb5ab3852,2021-08-10T11:04:48)

After some research, I found that the time-attr will be dropped after join operator in Flink.

Can anyone tell me how can I get the correct data using FLink-sql ?

1
You should be able to make that work. Please share a minimal, reproducible example, including the table and view definitions, the query, and any error messages so we can figure this out.David Anderson

1 Answers

0
votes

I'm not sure what's wrong, but here are some ideas:

FWIW, it's easier to iterate and debug these situations using an interactive tool, such as the SQL client that ships with Flink, or the one built into Ververica Platform.

You can use "explain ..." to see the plan that has been produced for the query. This often provides some clues.

I'm not sure what to expect from WATERMARK FOR ts AS withOffset(ts,1000). Are you sure that works? In most cases where a Flink job or SQL query produces no results, the cause turns out to be a problem with the watermarking.

I would try tearing apart the 3-way join, and instead do a sequence of two joins. That may produce a different plan; possibly one that works better.