0
votes

I'm trying to write a simple Kafka Streams application (targeting Kafka 2.2/Confluent 5.2) to transform an input topic with at-least-once semantics into an exactly-once output stream. I'd like to encode the following logic:

  • For each message with a given key:
    • Read a message timestamp from a string field in the message value
    • Retrieve the greatest timestamp we've previously seen for this key from a local state store
      • If the message timestamp is less than or equal to the timestamp in the state store, don't emit anything
      • If the timestamp is greater than the timestamp in the state store, or the key doesn't exist in the state store, emit the message and update the state store with the message's key/timestamp

(This is guaranteed to provide correct results based on ordering guarantees that we get from the upstream system; I'm not trying to do anything magical here.)

At first I thought I could do this with the Kafka Streams flatMapValues operator, which lets you map each input message to zero or more output messages with the same key. However, that documentation explicitly warns:

This is a stateless record-by-record operation (cf. transformValues(ValueTransformerSupplier, String...) for stateful value transformation).

That sounds promising, but the transformValues documentation doesn't make it clear how to emit zero or one output messages per input message. Unless that's what the // or null aside in the example is trying to say?

flatTransform also looked somewhat promising, but I don't need to manipulate the key, and if possible I'd like to avoid repartitioning.

Anyone know how to properly perform this kind of filtering?

1
Note: flatTransformValues() is being added to Kafka Streams, and will most likely be available in the next Kafka version, which is v2.3. See cwiki.apache.org/confluence/display/KAFKA/… in case you need more details. This doesn't help you today, but you might want to keep it in mind for later.Michael G. Noll

1 Answers

3
votes

you could use Transformer for implementing stateful operations as you described above. In order to not propagate a message downstream, you need to return null from transform method, this mentioned in Transformer java doc. And you could manage propagation via processorContext.forward(key, value). Simplified example provided below

kStream.transform(() -> new DemoTransformer(stateStoreName), stateStoreName)

public class DemoTransformer implements Transformer<String, String, KeyValue<String, String>> {
    private ProcessorContext processorContext;
    private String stateStoreName;
    private KeyValueStore<String, String> keyValueStore;

    public DemoTransformer(String stateStoreName) {
        this.stateStoreName = stateStoreName;
    }

    @Override
    public void init(ProcessorContext processorContext) {
        this.processorContext = processorContext;
        this.keyValueStore = (KeyValueStore) processorContext.getStateStore(stateStoreName);
    }

    @Override
    public KeyValue<String, String> transform(String key, String value) {
        String existingValue = keyValueStore.get(key);
        if (/* your condition */) {
            processorContext.forward(key, value);
            keyValueStore.put(key, value);
        }

        return null;
    }

    @Override
    public void close() {
    }
}