I need to count the number of times in a day that A happens and in 15 minutes that B happens。 The stream maybe A1 ,A2,B1,B2,A3,B3,B4,B5,A4,A5,A6,A7,B6。 In my case the event results are A2,B1 A3,B3 A7,B6。 And I need receive realtime result when the matcher happen。 I've tired something。 I think it can be only true by use flink cep 。But flink-sql-cep not support aggregation. It only calculate event happened。 In this case ,how to accomplish this task with a single SQL.
I tired two step to do it.I use flink sql cep to matcher first,and then sink to kafka. In step to I souce pre kafka and use over window to aggregation.
first step:
select pins as pin,'first-step' as result_id, cast(order_amount as varchar) as result_value,event_time as result_time
from stra_dtpipeline MATCH_RECOGNIZE
( PARTITION BY pin
ORDER BY event_time
MEASURES
t1.pin as pins,
'1' as order_amount,
LOCALTIMESTAMP as event_time
ONE ROW PER MATCH
AFTER MATCH SKIP to next row
PATTERN (t1 t2) WITHIN INTERVAL '30' SECOND
DEFINE
t1 as t1.act_type='100001' ,
t2 as t2.act_type='100002' )
second step:
select pin,'job5' as result_id,cast(sum(1) over (PARTITION BY pin,cast(DATE_FORMAT(event_time,'%Y%m%d') as VARCHAR) order by event_time ROWS BETWEEN INTERVAL '1' DAY PRECEDING AND CURRENT ROW ) as VARCHAR) as result_value,CURRENT_TIMESTAMP as result_time
from stra_dtpipeline_mid
where result_id='first-step' and DAYOFMONTH(CURRENT_DATE)=DAYOFMONTH(event_time)
I expect accomplish this task with a single SQL.