2
votes

I am doing POC on session windows for one of the stream requirements i have. For this i am using Session windows as this fits requirement where i have to aggregate the events with unique transaction id into list. Each transaction can have multiple events generated by different applications and push them to kafka. Below is the code

    StreamsConfig streamsConfig = new StreamsConfig(getProperties());
    Serde<String> stringSerde = Serdes.String();
    Serde<Transaction> transactionSerde = StreamsSerdes.TransactionSerde();

    Aggregator<String,Transaction, List<Transaction>> agg = (key, value, list)
            -> {
        list.add(value);
        return list;
    };

    Merger<String, List<Transaction>> merger = (key, v1, v2) ->
       Stream.concat(v1.stream(), v2.stream())
               .collect(Collectors.toList());

    Materialized<String,List<Transaction>,SessionStore<Bytes, byte[]>>
            materialized = Materialized.<String,List<Transaction>>as(Stores
            .persistentSessionStore
            ("trans-store", 1000 * 30)).withKeySerde(stringSerde).withValueSerde(StreamsSerdes
            .TransactionsListSerde());


    Initializer<List<Transaction>> init = () -> new ArrayList<>();

    StreamsBuilder builder = new StreamsBuilder();
    KTable<Windowed<String>, List<Transaction>> customerTransactionCounts =
             builder.stream(TRANSACTIONS_TOPIC, Consumed.with(stringSerde, transactionSerde).withOffsetResetPolicy(LATEST))
            .groupBy((noKey, transaction) -> transaction.getCustomerId(),
                    Serialized.with(stringSerde, transactionSerde))
            .windowedBy(SessionWindows.with(10000).until(1000 * 30))
                     .aggregate(init,agg,merger,materialized);


    customerTransactionCounts.toStream().print(Printed.<Windowed<String>, List<Transaction>>toSysOut()
            .withLabel("Customer Transactions List").withKeyValueMapper((key, list) ->
                    ("Current Time " + new Date().toString() + " Customer Id - " + key.key()  +
                    " START " +
                    new Date
                    (key.window().start()).toString() + " --- END " + new Date(key.window().end()).toString()+ "  " + list)));


    KafkaStreams kafkaStreams = new KafkaStreams(builder.build(), streamsConfig);
    kafkaStreams.cleanUp();

How does the retention period work here??

1) First i ingested some data with transaction id X with events date range from

START Mon Apr 16 22:25:40 EDT 2018 --- END Mon Apr 16 22:25:49 EDT 2018

All of them were aggregated as they fall into same session.

2) Next i ingested single record transaction id X time START Mon Apr 16 22:26:45 EDT 2018.

I see 1 record after commit interval as expected

Here as per my understanding the stream time changed to 22:26:45. At this point the records ingested above should be expired from state store as endtime < stream time - retention period (30 secs)

3) Next i ingested record single record transaction id X that falls into the same time range as the first set of events. I see all the records from the first step and the new current record in the aggregated results after commit interval.

Shouldn't the first set of record be expired from state store as they are past the retention period???

In the third step i was assuming i would get only one aggregated record as the older ones should have been removed. When would the retention period kick in to remove records from state store??

1

1 Answers

1
votes

Retention time is a "minimum" -- for more efficient expiration, data is stored in so-called segments (based on time intervals), and segments are expired when all data in a segment passed retention time.