I recently started looking into Spring Cloud Stream for Kafka, and have struggled to make the TestBinder work with Kstreams. Is this a known limitation, or have I just overlooked something?
This works fine:
String processor:
@StreamListener(TopicBinding.INPUT)
@SendTo(TopicBinding.OUTPUT)
public String process(String message) {
return message + " world";
}
String test:
@Test
@SuppressWarnings("unchecked")
public void testString() {
Message<String> message = new GenericMessage<>("Hello");
topicBinding.input().send(message);
Message<String> received = (Message<String>) messageCollector.forChannel(topicBinding.output()).poll();
assertThat(received.getPayload(), equalTo("Hello world"));
}
But when I try to use KStream in my process, I can't get the TestBinder to be working.
Kstream processor:
@SendTo(TopicBinding.OUTPUT)
public KStream<String, String> process(
@Input(TopicBinding.INPUT) KStream<String, String> events) {
return events.mapValues((value) -> value + " world");
}
KStream test:
@Test
@SuppressWarnings("unchecked")
public void testKstream() {
Message<String> message = MessageBuilder
.withPayload("Hello")
.setHeader(KafkaHeaders.TOPIC, "event.sirism.dev".getBytes())
.setHeader(KafkaHeaders.MESSAGE_KEY, "Test".getBytes())
.build();
topicBinding.input().send(message);
Message<String> received = (Message<String>)
messageCollector.forChannel(topicBinding.output()).poll();
assertThat(received.getPayload(), equalTo("Hello world"));
}
As you might have noticed, I omitted the @StreamListener from the Kstream processor, but without it it doesn't seem like the testbinder can find the handler. (but with it, it doesn't work when starting up the application)
Is this a known bug / limitation, or am I just doing something stupid here?