4
votes

I'm using Java and Mockito to mock some methods for unit testing. I want to mock the producer in the below code so that I can test for the log messages that are sent when the exception is thrown. I tried mocking the future however I get the error that the future.get() method cannot be mocked, and the RecordMetadata class is final and cannot be mocked. Any help would be greatly appreciated.

The producer in the below example is a KafkaProducer.

public void send(Log log){
  Future<RecordMetadata> future = producer.send(new ProducerRecord<(this.topic, record));
  try {
     RecordMetadata recordMetadata = send.get();
  } catch (InterruptedException e) {
     LOG.error("Sending the message to kafka was interrupted. "+e);
  }
}
2

2 Answers

5
votes

Kafka supplies a MockProducer class that you can inject into your class for testing. You can set the producer in your class to an instance of MockProducer in a JUnit @Before-annotated setup method. From the referenced documentation for MockProducer:

A mock of the producer interface you can use for testing code that uses Kafka.

By default this mock will synchronously complete each send call successfully. However it can be configured to allow the user to control the completion of the call and supply an optional error for the producer to throw.

You can use the #errorNext method to supply the exception that you want to throw in your test scenario.

3
votes

It is difficult to be precise here without seeing your test code. Two issues

1) RecordMetadata cannot be used a Mockito mock, this is a known limitation of Mockito. Instead you can create a dummy instance of RecordMetadata using its public constructor.

2) KafkaProducer can be mocked by Mockito, but you need a two stage approach. Firstly the mock KafkaProducer returns a Future, and secondly that Future is also a mock that returns some known value.

public class ServiceTest {

    @Mock
    private KafkaProducer<String, Integer> producer;
    @Mock
    private Future<RecordMetadata> future;

    @Before
    public void setup() {
        MockitoAnnotations.initMocks(this);
    }

    @Test
    public void success() throws Exception {
        RecordMetadata dummyRecord = new RecordMetadata(null, 0L, 0L, 0L, 0L, 0, 0);
        when(producer.send(any())).thenReturn(future);
        when(future.get()).thenReturn(dummyRecord);

        producer.send(null);
    }

    @Test
    public void timeout() throws Exception {
        when(producer.send(any())).thenReturn(future);
        when(future.get()).thenThrow(new TimeoutException("timeout"));

        producer.send(null);
    }
}