I am trying to build a custom state store which stores key to map of values.
Stream & Store configuration
final Serde<HashMap<String, ?>> userSessionsSerde = Serdes.serdeFrom(new HashMapSerializer(), new HashMapDeserializer()); StoreBuilder sessionStoreBuilder = Stores.keyValueStoreBuilder(Stores.persistentKeyValueStore(storeName), Serdes.String(), userSessionsSerde); builder.addStateStore(sessionStoreBuilder); builder.stream("connection-events", Consumed.with(Serdes.String(), wsSerde)) .transform(wsEventTransformerSupplier, storeName) .to("status-changes", Produced.with(Serdes.String(), Serdes.String())); KafkaStreams streams = new KafkaStreams(builder.build(), properties); streams.start();
Transformer
public class WSEventProcessor implements Transformer<String, ConnectionEvent, KeyValue<String, String>> {
private String storeName = "user-sessions";
private KeyValueStore<String, Map<String, ConnectionEvent>> stateStore;
final Serde<HashMap<String, ?>> userSessionsSerde = Serdes.serdeFrom(new HashMapSerializer(), new HashMapDeserializer());
@SuppressWarnings("unchecked")
@Override
public void init(ProcessorContext context) {
this.context = context;
stateStore = (KeyValueStore<String, Map<String, ConnectionEvent>>) context.getStateStore(storeName);
}
@Override
public void close() {
}
@Override
public KeyValue<String, String> transform(String key, ConnectionEvent value) {
boolean sendUpdate = false;
//Send null if there are no updates to be sent to downstream processors
if(value.getState() == WebSocketConnection.CONNECTED) {
if(stateStore.get(key) == null) {
stateStore.put(key, new HashMap<>());
sendUpdate = true;
}
stateStore.get(key).put(value.getSessionId(), value);
return sendUpdate ? KeyValue.pair(key, "Online") : null;
}
else {
stateStore.get(key).remove(value.getSessionId());
int size = stateStore.get(key).size();
return stateStore.get(key).isEmpty() ? KeyValue.pair(key, "Offline") : null;
}
}
}
The state store always has 0 size map for each key irrespective of connected and disconnected events. Am I doing something wrong?