0
votes

I need to count the number of times in a day that A happens and in 15 minutes that B happens。 The stream maybe A1 ,A2,B1,B2,A3,B3,B4,B5,A4,A5,A6,A7,B6。 In my case the event results are A2,B1 A3,B3 A7,B6。 And I need receive realtime result when the matcher happen。 I've tired something。 I think it can be only true by use flink cep 。But flink-sql-cep not support aggregation. It only calculate event happened。 In this case ,how to accomplish this task with a single SQL.

I tired two step to do it.I use flink sql cep to matcher first,and then sink to kafka. In step to I souce pre kafka and use over window to aggregation.

first step: select pins as pin,'first-step' as result_id, cast(order_amount as varchar) as result_value,event_time as result_time from stra_dtpipeline MATCH_RECOGNIZE ( PARTITION BY pin
ORDER BY event_time MEASURES
t1.pin as pins, '1' as order_amount, LOCALTIMESTAMP as event_time ONE ROW PER MATCH AFTER MATCH SKIP to next row PATTERN (t1 t2) WITHIN INTERVAL '30' SECOND
DEFINE
t1 as t1.act_type='100001' , t2 as t2.act_type='100002' ) second step: select pin,'job5' as result_id,cast(sum(1) over (PARTITION BY pin,cast(DATE_FORMAT(event_time,'%Y%m%d') as VARCHAR) order by event_time ROWS BETWEEN INTERVAL '1' DAY PRECEDING AND CURRENT ROW ) as VARCHAR) as result_value,CURRENT_TIMESTAMP as result_time from stra_dtpipeline_mid where result_id='first-step' and DAYOFMONTH(CURRENT_DATE)=DAYOFMONTH(event_time)

I expect accomplish this task with a single SQL.

1

1 Answers

0
votes

You can compose the two queries into a single query by using either a subquery or a view.

That would be something like

SELECT a, b OVER (...) ORDER BY event_time FROM (SELECT x, y MATCH_RECOGNIZE ...) WHERE ...

or

CREATE VIEW pattern AS SELECT x, y MATCH_RECOGNIZE ...
SELECT ... FROM pattern WHERE ...