0
votes

I'm prototyping a fraud application. We'll frequently have metrics like "total amount of cash transactions in the last 5 days" that we need to compare against some threshold to determine if we raise an alert.

We're looking to use Kafka Streams to create and maintain the aggregates and then create an enhanced version of the incoming transaction that has the original transaction fields plus the aggregates. This enhanced record gets processed by a downstream rules system.

I'm wondering the best way to approach this. I've prototyped creating the aggregates with code like this:

TimeWindows twoDayHopping TimeWindows.of(TimeUnit.DAYS.toMillis(2))
               .advanceBy(TimeUnit.DAYS.toMillis(1));
KStream<String, AdditiveStatistics> aggrStream = transactions
    .filter((key,value)->{
        return value.getAccountTypeDesc().equals("P") &&
               value.getPrimaryMediumDesc().equals("CASH");

    })
    .groupByKey()
    .aggregate(AdditiveStatistics::new,
               (key,value,accumulator)-> {                 
                   return AdditiveStatsUtil
                     .advance(value.getCurrencyAmount(),accumulator),
                              twoDayHopping,
                              metricsSerde,
                              "sas10005_store")
                } 
     .toStream()
     .map((key,value)-> {
                value.setTransDate(key.window().start());
                return new KeyValue<String, AdditiveStatistics>(key.key(),value);
            })
     .through(Serdes.String(),metricsSerde,datedAggrTopic);;

This creates a store-backed stream that has a records per key per window. I then join the original transactions stream to this window to produce the final output to a topic:

  JoinWindows joinWindow = JoinWindows.of(TimeUnit.DAYS.toMillis(1))
                                        .before(TimeUnit.DAYS.toMillis(1))
                                        .after(-1)
                                        .until(TimeUnit.DAYS.toMillis(2)+1);
    KStream<String,Transactions10KEnhanced> enhancedTrans = transactions.join(aggrStream,
                      (left,right)->{
                            Transactions10KEnhanced out = new Transactions10KEnhanced();
                            out.setAccountNumber(left.getAccountNumber());
                            out.setAccountTypeDesc(left.getAccountTypeDesc());
                            out.setPartyNumber(left.getPartyNumber());
                            out.setPrimaryMediumDesc(left.getPrimaryMediumDesc());
                            out.setSecondaryMediumDesc(left.getSecondaryMediumDesc());
                            out.setTransactionKey(left.getTransactionKey());
                            out.setCurrencyAmount(left.getCurrencyAmount());
                            out.setTransDate(left.getTransDate());
                            if(right != null) {
                                out.setSum2d(right.getSum());

                            }
                            return out;
                       },
                       joinWindow);

This produces the correct results, but it seems to run for quite a while, even with a low number of records. I'm wondering if there's a more efficient way to achieve the same result.

1
Care to elaborate? For example, what do you mean by "it seems to run for a quite a while" -- the application should be running continuously rather than "run for a while then stop"? Do you mean processing a new input record takes longer than you'd expect? If so, how long is "quite a while" -- a few seconds, a minute, an hour?Michael G. Noll
I put a sleep in the main() method to allow the background threads to process the input data (16 input records with the same key). I have to set it to over 30 seconds to see any output to the topic. It appears that if the program isn't left to run that long, it doesn't have enough time to actually process this few records. Wondering if there's a lot of network/inter-process shuffling going on to cause this.Tim Stearn

1 Answers

1
votes

It's a config issues: cf http://docs.confluent.io/current/streams/developer-guide.html#memory-management

Disable caching by setting cache size to zero (parameter cache.max.bytes.buffering in StreamsConfig) will resolve the "delayed" delivery to the output topic.

You might also read this blog post for some background information about Streams design: https://www.confluent.io/blog/watermarks-tables-event-time-dataflow-model/