41
votes

Edit FYI: working gitHub example


I was searching the internet and couldn't find a working and simple example of an embedded Kafka test.

My setup is:

  • Spring boot
  • Multiple @KafkaListener with different topics in one class
  • Embedded Kafka for test which is starting fine
  • Test with Kafkatemplate which is sending to topic but the @KafkaListener methods are not receiving anything even after a huge sleep time
  • No warnings or errors are shown, only info spam from Kafka in logs

Please help me. There are mostly over configured or overengineered examples. I am sure it can be done simple. Thanks, guys!

@Controller
public class KafkaController {

    private static final Logger LOG = getLogger(KafkaController.class);

    @KafkaListener(topics = "test.kafka.topic")
    public void receiveDunningHead(final String payload) {
        LOG.debug("Receiving event with payload [{}]", payload);
        //I will do database stuff here which i could check in db for testing
    }
}

private static String SENDER_TOPIC = "test.kafka.topic";

@ClassRule
public static KafkaEmbedded embeddedKafka = new KafkaEmbedded(1, true, SENDER_TOPIC);

@Test
    public void testSend() throws InterruptedException, ExecutionException {

        Map<String, Object> senderProps = KafkaTestUtils.producerProps(embeddedKafka);

        KafkaProducer<Integer, String> producer = new KafkaProducer<>(senderProps);
        producer.send(new ProducerRecord<>(SENDER_TOPIC, 0, 0, "message00")).get();
        producer.send(new ProducerRecord<>(SENDER_TOPIC, 0, 1, "message01")).get();
        producer.send(new ProducerRecord<>(SENDER_TOPIC, 1, 0, "message10")).get();
        Thread.sleep(10000);
    }
3
show the code. see if this helps stackoverflow.com/questions/48682745/…pvpkiran
@pvpkiran this is still not working. the test does only test itself but never reaches my KafkaListener when i just take the sending part to my topicYuna Braska
It’s not clear by your test code how that KafkaController is involved. How are you sure that the listener is started?Artem Bilan
@ArtemBilan cause there is the [@KafkaListener] annotation on the method. or do I have todo something else?Yuna Braska
Right, the test needs to bootstrap an application context with that componentArtem Bilan

3 Answers

40
votes

Embedded Kafka tests work for me with below configs,

Annotation on test class

@EnableKafka
@SpringBootTest(classes = {KafkaController.class}) // Specify @KafkaListener class if its not the same class, or not loaded with test config
@EmbeddedKafka(
    partitions = 1, 
    controlledShutdown = false,
    brokerProperties = {
        "listeners=PLAINTEXT://localhost:3333", 
        "port=3333"
})
public class KafkaConsumerTest {
    @Autowired
    KafkaEmbedded kafkaEmbeded;

    @Autowired
    KafkaListenerEndpointRegistry kafkaListenerEndpointRegistry;

Before annotation for setup method

@Before
public void setUp() throws Exception {
  for (MessageListenerContainer messageListenerContainer : kafkaListenerEndpointRegistry.getListenerContainers()) {
    ContainerTestUtils.waitForAssignment(messageListenerContainer, 
    kafkaEmbeded.getPartitionsPerTopic());
  }
}

Note: I am not using @ClassRule for creating embedded Kafka rather auto-wiring
@Autowired embeddedKafka

@Test
public void testReceive() throws Exception {
     kafkaTemplate.send(topic, data);
}

Hope this helps!

Edit: Test configuration class marked with @TestConfiguration

@TestConfiguration
public class TestConfig {

@Bean
public ProducerFactory<String, String> producerFactory() {
    return new DefaultKafkaProducerFactory<>(KafkaTestUtils.producerProps(kafkaEmbedded));
}

@Bean
public KafkaTemplate<String, String> kafkaTemplate() {
    KafkaTemplate<String, String> kafkaTemplate = new KafkaTemplate<>(producerFactory());
    kafkaTemplate.setDefaultTopic(topic);
    return kafkaTemplate;
}

Now @Test method will autowire KafkaTemplate and use is to send message

kafkaTemplate.send(topic, data);

Updated answer code block with above line

12
votes

since the accepted answer doesn't compile or work for me. I find another solution based on https://blog.mimacom.com/testing-apache-kafka-with-spring-boot/ what I would like to share with you.

The dependency is 'spring-kafka-test' version: '2.2.7.RELEASE'

@RunWith(SpringRunner.class)
@EmbeddedKafka(partitions = 1, topics = { "testTopic" })
@SpringBootTest
public class SimpleKafkaTest {

    private static final String TEST_TOPIC = "testTopic";

    @Autowired
    EmbeddedKafkaBroker embeddedKafkaBroker;

    @Test
    public void testReceivingKafkaEvents() {
        Consumer<Integer, String> consumer = configureConsumer();
        Producer<Integer, String> producer = configureProducer();

        producer.send(new ProducerRecord<>(TEST_TOPIC, 123, "my-test-value"));

        ConsumerRecord<Integer, String> singleRecord = KafkaTestUtils.getSingleRecord(consumer, TEST_TOPIC);
        assertThat(singleRecord).isNotNull();
        assertThat(singleRecord.key()).isEqualTo(123);
        assertThat(singleRecord.value()).isEqualTo("my-test-value");

        consumer.close();
        producer.close();
    }

    private Consumer<Integer, String> configureConsumer() {
        Map<String, Object> consumerProps = KafkaTestUtils.consumerProps("testGroup", "true", embeddedKafkaBroker);
        consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        Consumer<Integer, String> consumer = new DefaultKafkaConsumerFactory<Integer, String>(consumerProps)
                .createConsumer();
        consumer.subscribe(Collections.singleton(TEST_TOPIC));
        return consumer;
    }

    private Producer<Integer, String> configureProducer() {
        Map<String, Object> producerProps = new HashMap<>(KafkaTestUtils.producerProps(embeddedKafkaBroker));
        return new DefaultKafkaProducerFactory<Integer, String>(producerProps).createProducer();
    }
}
8
votes

I solved the issue now

@BeforeClass
public static void setUpBeforeClass() {
    System.setProperty("spring.kafka.bootstrap-servers", embeddedKafka.getBrokersAsString());
    System.setProperty("spring.cloud.stream.kafka.binder.zkNodes", embeddedKafka.getZookeeperConnectionString());
}

while I was debugging, I saw that the embedded kaka server is taking a random port.

I couldn't find the configuration for it, so I am setting the kafka config same as the server. Looks still a bit ugly for me.

I would love to have just the @Mayur mentioned line

@EmbeddedKafka(partitions = 1, controlledShutdown = false, brokerProperties = {"listeners=PLAINTEXT://localhost:9092", "port=9092"})

but can't find the right dependency in the internet.