0
votes

I use Flink SQL and CEP to recognize some really simple patterns. However, I found a weird thing (likely a bug). I have two example tables password_change and transfer as below.

transfer

transid,accountnumber,sortcode,value,channel,eventtime,eventtype
1,123,1,100,ONL,2020-01-01T01:00:01Z,transfer
3,123,1,100,ONL,2020-01-01T01:00:02Z,transfer
4,123,1,200,ONL,2020-01-01T01:00:03Z,transfer
5,456,1,200,ONL,2020-01-01T01:00:04Z,transfer

password_change

accountnumber,channel,eventtime,eventtype
123,ONL,2020-01-01T01:00:05Z,password_change
456,ONL,2020-01-01T01:00:06Z,password_change
123,ONL,2020-01-01T01:00:08Z,password_change
123,ONL,2020-01-01T01:00:09Z,password_change

Here are my SQL queries.

First create a temporary view event as

(SELECT accountnumber,rowtime,eventtype FROM password_change WHERE channel='ONL') 
UNION ALL 
(SELECT accountnumber,rowtime, eventtype FROM transfer WHERE channel = 'ONL' )

rowtime column is the event time extracted directly from original eventtime col with watermark periodic bound 1 second.

Then output the query result of

SELECT * FROM `event`
    MATCH_RECOGNIZE ( 
        PARTITION BY accountnumber 
        ORDER BY rowtime 
        MEASURES 
            transfer.eventtype AS event_type,
            transfer.rowtime AS transfer_time
        ONE ROW PER MATCH 
        AFTER MATCH SKIP PAST LAST ROW
        PATTERN (transfer password_change )  WITHIN INTERVAL '5' SECOND 
        DEFINE 
            password_change AS eventtype='password_change', 
            transfer AS eventtype='transfer' 
    )

It should output

123,transfer,2020-01-01T01:00:03Z
456,transfer,2020-01-01T01:00:04Z

But I got nothing when running Flink 1.11.1 (also no output for 1.10.1).

What's more, I change the pattern to only password_change, it still output nothing, but if I change the pattern to transfer then it outputs several rows but not all transfer rows. If I exchange the eventtime of two tables which means let password_changes happen first, then the pattern password_change will output several rows while transfer not.

On the other hand, if I extract those columns from two tables and merge them in one table manually, then emit them into Flink, the running result is correct.

I searched and tried a lot to get it right including changing the SQL statement, watermark, buffer timeout and so on, but nothing helped. Hope anyone here can help. Thanks.

10/10/2020 update:

I use Kafka as the table source. tEnv is the StreamTableEnvironment.

Kafka kafka=new Kafka()
            .version("universal")
            .property("bootstrap.servers", "localhost:9092");

tEnv.connect(
            kafka.topic("transfer")
    ).withFormat(
            new Json()
                .failOnMissingField(true)
    ).withSchema(
            new Schema()
                .field("rowtime",DataTypes.TIMESTAMP(3))
                .rowtime(new Rowtime()
                             .timestampsFromField("eventtime")
                             .watermarksPeriodicBounded(1000)
                )
                .field("channel",DataTypes.STRING())
                .field("eventtype",DataTypes.STRING())
                .field("transid",DataTypes.STRING())
                .field("accountnumber",DataTypes.STRING())
                .field("value",DataTypes.DECIMAL(38,18))
    ).createTemporaryTable("transfer");
        

tEnv.connect(
            kafka.topic("pchange")
    ).withFormat(
            new Json()
                .failOnMissingField(true)
    ).withSchema(
            new Schema()
                .field("rowtime",DataTypes.TIMESTAMP(3))
                .rowtime(new Rowtime()
                            .timestampsFromField("eventtime")
                            .watermarksPeriodicBounded(1000)
                )
                .field("channel",DataTypes.STRING())
                .field("accountnumber",DataTypes.STRING())
                .field("eventtype",DataTypes.STRING())
    ).createTemporaryTable("password_change");

Thank @Dawid Wysakowicz's answer. To confirm that, I added 4,123,1,200,ONL,2020-01-01T01:00:10Z,transfer to the end of transfer table, then the output becomes right, which means it is really some problem about watermarks.

So now the question is how to fix it. Since a user will not change his/her password frequently, the time gap between these two table is unavoidable. I just need the UNION ALL table has the same behavior as that I merged manually.

Update Nov. 4th 2020:
WatermarkStrategy with idle sources may help.

1
Most likely the problem is somewhere around watermark generation in conjunction with the UNION ALL operator. Could you share how you create the two tables including how you define the time attributes?Dawid Wysakowicz

1 Answers

0
votes

Most likely the problem is somewhere around watermark generation in conjunction with the UNION ALL operator. Could you share how you create the two tables including how you define the time attributes and what are the connectors? It could let me confirm my suspicions.

I think the problem is that one of the sources stops emitting Watermarks. If the transfer table (or the table with lower timestamps) does not finish and produces no records it emits no Watermarks. After emitting the fourth row it will emit Watermark = 3 (4-1 second). The Watermark of a union of inputs is the smallest of values of the two. Therefore the first table will pause/hold the Watermark with value Watermark = 3 and thus you see no progress for the original query and you see some records emitted for the table with smaller timestamps.

If you manually join the two tables, you have just a single input with a single source of Watermarks and thus it progresses further and you see some results.