The problem i am facing is that I am unable to perform a sum on a single CEP pattern in scala. I want to detect when sum is greater than 6100 for a particular customerID. I am providing a keyed stream to the CEP.pattern(...). I have provided my code below for pattern construction.
val pattern1 =Pattern.begin[GenericRecord]("start").where((v,ctx) => {
lazy val sum= ctx.getEventsForPattern("start").map(_.get("amount").toString.toInt).sum
print((sum + v.get("amount").toString.toLong).toString)
//print(sum+v.get("amount").toString.toLong>6100)
//println(v.get("customer_id").toString+v.get("amount").toString+" , ")
(sum+v.get("amount").toString.toLong)>6100 && v.get("state").toString=="FAILED"
}).oneOrMore
My input is in avro format and Flink is consuming it from kafka. The input looks like this -:
{"trasanction_id":196,"customer_id":28,"datetime":"2017-09-01 12:35:08","amount":6094,"state":"FAILED"}
{"trasanction_id":198,"customer_id":27,"datetime":"2017-09-01 12:36:04","amount":6024,"state":"FAILED"}
{"trasanction_id":199,"customer_id":27,"datetime":"2017-09-01 12:36:05","amount":2399,"state":"FAILED"}
{"trasanction_id":197,"customer_id":28,"datetime":"2017-09-01 12:36:36","amount":547,"state":"FAILED"}```
However, below code runs well while using two patterns-:
val pattern1=Pattern.begin[GenericRecord]("start").followedBy("middle").where((v,ctx) => {
lazy val sum= ctx.getEventsForPattern("start").map(_.get("amount").toString.toInt).sum
print((sum + v.get("amount").toString.toLong).toString)
//print(sum+v.get("amount").toString.toLong>6100)
//println(v.get("customer_id").toString+v.get("amount").toString+" , ")
(sum+v.get("amount").toString.toLong)>6100 && v.get("state").toString=="FAILED"
}).oneOrMore