
I'm trying to write a Kafka stream processor using Spring boot but it's not getting invoked when messages are produced into the topic.

I have the following producer that works fine with the topic name adt.events.location.

public class Producer {

    private static final Logger logger = LoggerFactory.getLogger(Producer.class);
    private static final String TOPIC = "adt.events.location";
    private final KafkaTemplate<String, Object> kafkaTemplate;

    public Producer(KafkaTemplate<String, Object> kafkaTemplate) {
        this.kafkaTemplate = kafkaTemplate;

    public void produce() {
        this.kafkaTemplate.send(TOPIC, "1", new EventPatientCheckedIn(1L, 1001L, 104L, 11L, 6L));
        this.kafkaTemplate.send(TOPIC, "1", new EventPatientBedChanged(1L, 1001L, 7L));
        this.kafkaTemplate.send(TOPIC, "1", new EventPatientRoomChanged(1L, 1001L, 10L));
        this.kafkaTemplate.send(TOPIC, "2", new EventPatientCheckedIn(2L, 1002L, 110L, 18L, 2L));
        this.kafkaTemplate.send(TOPIC, "3", new EventPatientCheckedIn(3L, 1003L, 111L, 16L, 1L));
        this.kafkaTemplate.send(TOPIC, "1", new EventPatientCheckedOut(1L, 1001L));
        this.kafkaTemplate.send(TOPIC, "3", new EventPatientBedChanged(3L, 1003L, 3L));

The topic messages have different types and are in Avro format. The schema is registered in the schema registry with Avro union.

These are the topics

public class KafkaConfig {

    public NewTopic topicEventsLocation() {
        return TopicBuilder.name("adt.events.location").partitions(1).replicas(1).build();

    public NewTopic topicPatientLocation() {
        return TopicBuilder.name("adt.patient.location").partitions(1).replicas(1).build();

application.yml I'm using cp-all-in-one-community as docker-file

  port: 9000
          schemas: false
          version: true
          url: http://localhost:8081
      bootstrap-servers: localhost:9092
      group-id: group_id
      auto-offset-reset: earliest
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: io.confluent.kafka.serializers.KafkaAvroDeserializer
      bootstrap-servers: localhost:9092
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: io.confluent.kafka.serializers.KafkaAvroSerializer
      application-id: kafka-demo
        default.key.serde: org.apache.kafka.common.serialization.Serdes$LongSerde
        default.value.serde: io.confluent.kafka.streams.serdes.avro.GenericAvroSerde

Spring Boot application

public class KafkaDemoApplication {

    public static void main(String[] args) {
        SpringApplication.run(KafkaDemoApplication.class, args);

and the Processor

public class Processor {

    final StreamsBuilder builder = new StreamsBuilder();

    public void process() {

The expected behaviour is to see the schema name printed out every time a message is produced and the output topic adt.patient.location being populated with the stream process output but nothing happen.

I'm new to Kafka so probably I'm missing something.


I was actually missing the @EnableKafkaStreams annotation.

But now I get the following error:

2021-04-07 16:02:16.967 ERROR 120225 --- [           main] org.apache.kafka.streams.KafkaStreams    : stream-client [LocationService-9611eedf-df9b-4fe5-9a7d-058027cee22a] Topology with no input topics will create no stream threads and no global thread.
2021-04-07 16:02:16.967  WARN 120225 --- [           main] ConfigServletWebServerApplicationContext : Exception encountered during context initialization - cancelling refresh attempt: org.springframework.context.ApplicationContextException: Failed to start bean 'defaultKafkaStreamsBuilder'; nested exception is org.springframework.kafka.KafkaException: Could not start stream: ; nested exception is org.apache.kafka.streams.errors.TopologyException: Invalid topology: Topology has no stream threads and no global threads, must subscribe to at least one source topic or global table.
Are you using spring cloud stream? The imperative or the functional style?Felipe
Post the configuration that you have in the application.ymlFelipe
functional style, without spring cloud stream.Pietro

2 Answers


Use @Autowired on the KafkaTemplate. I think this is the thing that you are missing. The example that I give does not use AvroSerializer. So I assume that your serializer is working. At least you should see the message arriving on the consumer or a serialization error. Moreover, you can improve your method to handle callbacks and use a more consistent message record. For instance, use the ProducerRecord to create the message that you will send. Add a callback using ListenableFuture.

public class Producer {
   KafkaTemplate<String, Object> kafkaTemplate;

   public void produce() {
        String key = "1";
        Object value = EventPatientCheckedIn....

        ProducerRecord<String, Object> producerRecord = buildProducerRecord(TOPIC, key, value);

        ListenableFuture<SendResult<String, Object>> listenableFuture = kafkaTemplate.send(producerRecord);

        listenableFuture.addCallback(new ListenableFutureCallback<SendResult<String, Object>>() {
            public void onFailure(Throwable ex) { handleFailure(key, value, ex); }

            public void onSuccess(SendResult<String, Object> result) { handleSuccess(key, value, result); }

    private ProducerRecord<String, Object> buildProducerRecord(String topic, String key, Object value) {
        List<Header> recordHeaders = List.of(new RecordHeader("event-source", "scanner".getBytes()));
        return new ProducerRecord<String, Object>(topic, null, key, value, recordHeaders);

    private void handleSuccess(String key, Object value, SendResult<String, Object> result) {
        log.info("message send successfully for the key: {} and value: {} at partition: {}", key, value, result.getRecordMetadata().partition());

    private void handleFailure(String key, Object value, Throwable ex) {
        log.error("error sending the message and the exception us {}", ex.getMessage());
        try { throw ex; }
        catch (Throwable throwable) {
            log.error("error on failure: {}", throwable.getMessage());

UPDATE: I think you are missing to configure Properties and then make the streams.start(); on your Processor. I based this example on this reference.

Properties props = new Properties();
    props.put(StreamsConfig.APPLICATION_ID_CONFIG, "adt.events.location");
    props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka-broker1:9092");
    props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
KafkaStreams streams = new KafkaStreams(builder.build(), props);

I found two errors:

  1. Add the annotation @EnableKafkaStreams
  2. Error in the processor with the StreamsBuilder injection
public class KafkaDemoApplication {

    public static void main(String[] args) {
        SpringApplication.run(KafkaDemoApplication.class, args);
public void process(final StreamsBuilder builder) {
    logger.info("Processing location events");