Currently, I have a basic Kafka streams application that involves a Topology with only a source and a processor, but no sink. Essentially, the Topology only handles the consumption of messages. As for producing messages, we make calls to the Producer API in the ProcessorSupplier instance passed to the Topology, specifically in the overridden process
method. Although I understand that the Producer API is redundant here since I could have simply added a sink to the topology, I am in a position where I have to setup my streams application in this way. As for testing, I tried out the TopologyTestDriver
class that is available in the kafka-streams-test-utils package. However, I want to not only test out the topology but also the calls to the Producer API. Using the TopologyTestDriver
requires me to mock my Producer
instance since that is separate from the Streams API. As a result, because the messages are not "fowarded", I am unable to read messages from the TopologyTestDriver
for my unit tests.
Here is a simplified version of my process
method:
@Override
public void process(String key, String value) {
// some data processing stuff that I leave out for simplicity sake
String topic = "...";
Properties props = ...;
//Producer<String, String> producer = new KafkaProducer<>(props);
ProducerRecord<String, String> record = new ProducerRecord(topic, key, value);
producer.send(record);
}
And here is a simplification of my sample unit test:
@Test
public void process() {
Topology topology = new Topology();
topology.addSource("source", "input-topic");
topology.addProcessor("processor", ..., "source");
Properties props = ...;
TopologyTestDriver testDriver = new TopologyTestDriver(topology, props);
ConsumerRecordFactory<String, String> factory = new ConsumerRecordFactory<>(new StringSerializer(), new StringSerializer());
// the following line will work fine as long as the producer is mocked
testDriver.pipeInput(factory.create("input-topic", "key", "value"));
// since the producer is mocked, no message can be read from the output topic
ProducerRecord<String, String> outputRecord = testDriver.readOutput("output-topic", new StringDeserializer(), new StringDeserializer());
assertNull(outputRecord); // returns true
}
To sum up my question, is there a way to write a unit test that tests both the consumption and production of messages within a Topology that uses the Producer API for writing messages to outgoing topics?