Add the events to the StateStore with the key. The KeyValueIterator returned by the StateStore navigates KeyValue in ordered manner.
public class SortProcessor extends AbstractProcessor<String, Event> {
private static Logger LOG = LoggerFactory.getLogger(SortProcessor.class);
private final String stateStore;
private final Long bufferIntervalInSeconds;
// Why not use a simple Java NavigableMap? Check out my answer at : https://stackoverflow.com/a/62677079/2256618
private KeyValueStore<String, Event> keyValueStore;
public SortProcessor(String stateStore, Long bufferIntervalInSeconds) {
this.stateStore = stateStore;
this.bufferIntervalInSeconds = bufferIntervalInSeconds;
}
@Override
public void init(ProcessorContext processorContext) {
super.init(processorContext);
keyValueStore = (KeyValueStore) context().getStateStore(stateStore);
context().schedule(Duration.ofSeconds(bufferIntervalInSeconds), PunctuationType.WALL_CLOCK_TIME, this::punctuate);
}
void punctuate(long timestamp) {
LOG.info("Punctuator invoked...");
try (KeyValueIterator<String, Event> iterator = keyValueStore.all()) {
while (iterator.hasNext()) {
KeyValue<String, Event> next = iterator.next();
if (next.value == null) {
continue;
}
LOG.info("Sending {}", next.key);
context().forward(null, next.value);
keyValueStore.delete(next.key);
}
}
}
@Override
public void process(String key, Event value) {
Event event = Event.builder(value).payload(value.getPayload().toUpperCase()).build();
keyValueStore.put(event.getEventType().name() + " " + event.getId(), event);
}
public static String getName() {
return "sort-processor";
}
}
Executable code is here. I have used a simple in-memory state store here. If you anticipate huge number of events in short spurt, you can use persistent state store as already suggested in other answer.