0
votes

Currently, I have a basic Kafka streams application that involves a Topology with only a source and a processor, but no sink. Essentially, the Topology only handles the consumption of messages. As for producing messages, we make calls to the Producer API in the ProcessorSupplier instance passed to the Topology, specifically in the overridden process method. Although I understand that the Producer API is redundant here since I could have simply added a sink to the topology, I am in a position where I have to setup my streams application in this way. As for testing, I tried out the TopologyTestDriver class that is available in the kafka-streams-test-utils package. However, I want to not only test out the topology but also the calls to the Producer API. Using the TopologyTestDriver requires me to mock my Producer instance since that is separate from the Streams API. As a result, because the messages are not "fowarded", I am unable to read messages from the TopologyTestDriver for my unit tests.

Here is a simplified version of my process method:

@Override
public void process(String key, String value) {
    // some data processing stuff that I leave out for simplicity sake
    String topic = "...";
    Properties props = ...;
    //Producer<String, String> producer = new KafkaProducer<>(props);
    ProducerRecord<String, String> record = new ProducerRecord(topic, key, value);
    producer.send(record);
}

And here is a simplification of my sample unit test:

@Test
public void process() {
    Topology topology = new Topology();
    topology.addSource("source", "input-topic");
    topology.addProcessor("processor", ..., "source");
    Properties props = ...;

    TopologyTestDriver testDriver = new TopologyTestDriver(topology, props);

    ConsumerRecordFactory<String, String> factory = new ConsumerRecordFactory<>(new StringSerializer(), new StringSerializer());
    // the following line will work fine as long as the producer is mocked
    testDriver.pipeInput(factory.create("input-topic", "key", "value"));

    // since the producer is mocked, no message can be read from the output topic
    ProducerRecord<String, String> outputRecord = testDriver.readOutput("output-topic", new StringDeserializer(), new StringDeserializer());

    assertNull(outputRecord); // returns true
}

To sum up my question, is there a way to write a unit test that tests both the consumption and production of messages within a Topology that uses the Producer API for writing messages to outgoing topics?

1
Spring has an embedded Kafka that can be used as the infrastructure for doing this.daniu

1 Answers

2
votes

You should not use a custom Producer but add a sink to your Topology. Calls to Producer.send() are async and thus you might be subject to data loss. To avoid data loss, you would need to make the call sync, ie, get the Future that is returned by send() and wait for its completion before process() returns. However, this has a big impact on your throughput and is not recommended.

If you add a sink you avoid those issue as Kafka Streams will now understand what data was sent to the output topic, and thus no data loss will happen, while Kafka Streams can use the more performant async call.

Beside the correctness issue, it seem you create a new KafkaProducer for every message you process in you current code, what is quite inefficient. Furthermore, using a sink will simplify your code. And of course, you get proper testing capabilities using the TopologyTestDriver.