I'm trying to write a Kafka stream processor using Spring boot but it's not getting invoked when messages are produced into the topic.
I have the following producer that works fine with the topic name adt.events.location
.
@Service
public class Producer {
private static final Logger logger = LoggerFactory.getLogger(Producer.class);
private static final String TOPIC = "adt.events.location";
private final KafkaTemplate<String, Object> kafkaTemplate;
public Producer(KafkaTemplate<String, Object> kafkaTemplate) {
this.kafkaTemplate = kafkaTemplate;
}
@EventListener(ApplicationStartedEvent.class)
public void produce() {
this.kafkaTemplate.send(TOPIC, "1", new EventPatientCheckedIn(1L, 1001L, 104L, 11L, 6L));
this.kafkaTemplate.send(TOPIC, "1", new EventPatientBedChanged(1L, 1001L, 7L));
this.kafkaTemplate.send(TOPIC, "1", new EventPatientRoomChanged(1L, 1001L, 10L));
this.kafkaTemplate.send(TOPIC, "2", new EventPatientCheckedIn(2L, 1002L, 110L, 18L, 2L));
this.kafkaTemplate.send(TOPIC, "3", new EventPatientCheckedIn(3L, 1003L, 111L, 16L, 1L));
this.kafkaTemplate.send(TOPIC, "1", new EventPatientCheckedOut(1L, 1001L));
this.kafkaTemplate.send(TOPIC, "3", new EventPatientBedChanged(3L, 1003L, 3L));
}
}
The topic messages have different types and are in Avro format. The schema is registered in the schema registry with Avro union.
These are the topics
@Configuration
public class KafkaConfig {
@Bean
public NewTopic topicEventsLocation() {
return TopicBuilder.name("adt.events.location").partitions(1).replicas(1).build();
}
@Bean
public NewTopic topicPatientLocation() {
return TopicBuilder.name("adt.patient.location").partitions(1).replicas(1).build();
}
}
application.yml I'm using cp-all-in-one-community as docker-file
server:
port: 9000
spring:
kafka:
properties:
auto:
register:
schemas: false
use:
latest:
version: true
schema:
registry:
url: http://localhost:8081
consumer:
bootstrap-servers: localhost:9092
group-id: group_id
auto-offset-reset: earliest
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: io.confluent.kafka.serializers.KafkaAvroDeserializer
producer:
bootstrap-servers: localhost:9092
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: io.confluent.kafka.serializers.KafkaAvroSerializer
streams:
application-id: kafka-demo
properties:
default.key.serde: org.apache.kafka.common.serialization.Serdes$LongSerde
default.value.serde: io.confluent.kafka.streams.serdes.avro.GenericAvroSerde
Spring Boot application
@SpringBootApplication
@EnableKafkaStreams
public class KafkaDemoApplication {
public static void main(String[] args) {
SpringApplication.run(KafkaDemoApplication.class, args);
}
}
and the Processor
@Component
public class Processor {
final StreamsBuilder builder = new StreamsBuilder();
@Autowired
public void process() {
...
}
}
The expected behaviour is to see the schema name printed out every time a message is produced and the output topic adt.patient.location
being populated with the stream process output but nothing happen.
I'm new to Kafka so probably I'm missing something.
UPDATE
I was actually missing the @EnableKafkaStreams
annotation.
But now I get the following error:
2021-04-07 16:02:16.967 ERROR 120225 --- [ main] org.apache.kafka.streams.KafkaStreams : stream-client [LocationService-9611eedf-df9b-4fe5-9a7d-058027cee22a] Topology with no input topics will create no stream threads and no global thread.
2021-04-07 16:02:16.967 WARN 120225 --- [ main] ConfigServletWebServerApplicationContext : Exception encountered during context initialization - cancelling refresh attempt: org.springframework.context.ApplicationContextException: Failed to start bean 'defaultKafkaStreamsBuilder'; nested exception is org.springframework.kafka.KafkaException: Could not start stream: ; nested exception is org.apache.kafka.streams.errors.TopologyException: Invalid topology: Topology has no stream threads and no global threads, must subscribe to at least one source topic or global table.
application.yml
– Felipe