Yes, I've read all docs I've found and try all alternatives on configuration, but just this simple example that should log a line does not work
(this is an Spring-Boot-2 app with spring-cloud-stream-binder-kafka-streams)
Kafka is storing an String value (null key)
my apllication.yaml
spring:
cloud:
stream:
bindings:
input:
destination: 'myStreamTopic'
output:
producer.keySerde: 'org.apache.kafka.common.serialization.Serdes$StringSerde'
kafka:
streams:
binder:
configuration:
default.key.serde: 'org.apache.kafka.common.serialization.Serdes$StringSerde'
default.value.serde: 'org.apache.kafka.common.serialization.Serdes$StringSerde'
brokers:
- 'ommited:9092'
- 'ommited:9092'
- 'ommited:9092'
application-id: hack1
Just this simple code as a POC:
@SpringBootApplication
@Slf4j
public class HackatonApplication {
public static void main(String[] args) {
SpringApplication.run(HackatonApplication.class, args);
}
@EnableBinding(KafkaStreamsProcessor.class)
public static class LineProcessor {
@StreamListener(Sink.INPUT)
public void process(KStream<?, String> line) {
log.info("Received: {}", line);
}
}
and I am unable to get it running!
org.springframework.context.ApplicationContextException: Failed to start bean 'outputBindingLifecycle'; nested exception is java.lang.IllegalArgumentException: Trying to invoke public abstract org.apache.kafka.streams.kstream.KStream org.apache.kafka.streams.kstream.KStream.map(org.apache.kafka.streams.kstream.KeyValueMapper) but no delegate has been set. at org.springframework.context.support.DefaultLifecycleProcessor.doStart(DefaultLifecycleProcessor.java:184) ~[spring-context-5.0.7.RELEASE.jar!/:5.0.7.RELEASE] at org.springframework.context.support.DefaultLifecycleProcessor.access$200(DefaultLifecycleProcessor.java:52) ~[spring-context-5.0.7.RELEASE.jar!/:5.0.7.RELEASE] at
sorry if this is trivial but I have been hours searching, googling and trying to find a documented solution.
process
parameters? -->public void process(KStream<Object, String> line)
- Val Bonn