0
votes

I am trying to build a custom state store which stores key to map of values.

Stream & Store configuration

        final Serde<HashMap<String, ?>> userSessionsSerde = Serdes.serdeFrom(new HashMapSerializer(), new HashMapDeserializer());
        StoreBuilder sessionStoreBuilder = Stores.keyValueStoreBuilder(Stores.persistentKeyValueStore(storeName), 
                        Serdes.String(), 
                        userSessionsSerde);
        builder.addStateStore(sessionStoreBuilder);
        
        builder.stream("connection-events", Consumed.with(Serdes.String(), wsSerde))
            .transform(wsEventTransformerSupplier, storeName)
            .to("status-changes", Produced.with(Serdes.String(), Serdes.String()));
        
        KafkaStreams streams = new KafkaStreams(builder.build(), properties);
        streams.start();

Transformer

public class WSEventProcessor implements Transformer<String, ConnectionEvent, KeyValue<String, String>> {

    private String storeName = "user-sessions";
    private KeyValueStore<String, Map<String, ConnectionEvent>> stateStore;

    final Serde<HashMap<String, ?>> userSessionsSerde = Serdes.serdeFrom(new HashMapSerializer(), new HashMapDeserializer());

    @SuppressWarnings("unchecked")
    @Override
    public void init(ProcessorContext context) {
        this.context = context;
        stateStore = (KeyValueStore<String, Map<String, ConnectionEvent>>) context.getStateStore(storeName);
    }

    @Override
    public void close() {
    }

    @Override
    public KeyValue<String, String> transform(String key, ConnectionEvent value) {
        boolean sendUpdate = false;
    
        //Send null if there are no updates to be sent to downstream processors
        if(value.getState() == WebSocketConnection.CONNECTED) {
            if(stateStore.get(key) == null) {
                stateStore.put(key, new HashMap<>());
                sendUpdate = true;
            }
            stateStore.get(key).put(value.getSessionId(), value);
            return sendUpdate ? KeyValue.pair(key, "Online") : null;
        }
        else {
            stateStore.get(key).remove(value.getSessionId());
            int size = stateStore.get(key).size();
            return stateStore.get(key).isEmpty() ? KeyValue.pair(key, "Offline") : null;
        }
    }

}  

The state store always has 0 size map for each key irrespective of connected and disconnected events. Am I doing something wrong?

1

1 Answers

0
votes

value object that you stored into stateStore.put(key, value) and stateStore.get(key) are different objects (as it serialized and then deserialized).

Your issue is related to modification of object returned from state store: stateStore.get(key).put(value.getSessionId(), value) and stateStore.get(key).remove(value.getSessionId()). when you update object stateStore.get(key), it's actually not persisted to state store, only changes that object.

So, to fix your issue, calculate required value (in your case HashMap), and only after that apply stateStore.put(key, calculated_value). If you need to remove key-value from state store, use stateStore.put(key, null). Your transform method should look approximately like:

public KeyValue<String, String> transform(String key, ConnectionEvent value) {
    Map<String, Object> valueFromStateStore = stateStore.get(key);
    Map<String, Object> valueToUpdate = ofNullable(valueFromStateStore).orElseGet(Collections::emptyMap);
    KeyValue<String, String> resultKeyValue = null;

    //Send null if there are no updates to be sent to downstream processors
    if(value.getState() == WebSocketConnection.CONNECTED) {
        if(valueToUpdate.isEmpty()) {
            resultKeyValue = KeyValue.pair(key, "Online");
        }
        valueToUpdate.put(value.getSessionId(), value);
    }
    else {
        valueToUpdate.remove(value.getSessionId());
        if (valueToUpdate.isEmpty()) {
            resultKeyValue = KeyValue.pair(key, "Offline");
        }
    }

    stateStore.put(key, valueToUpdate);
    return resultKeyValue;
}