My configuration for the consumer is as documented in Spring cloud stream consumer properties documentation.
spring-cloud-dependencies:Finchley.SR1
springBootVersion = '2.0.5.RELEASE'
I have 4 partitions for kstream_test
topic and they are filled with messages from producer as seen below:
root@kafka:/# kafka-run-class kafka.tools.GetOffsetShell --broker-list localhost:9092 --topic kstream_test --time -1
kstream_test:2:222
kstream_test:1:203
kstream_test:3:188
kstream_test:0:278
My spring cloud stream kafka binder based configuration is:
spring.cloud.stream.bindings.input:
destination: kstream_test
group: consumer-group-G1_test
consumer:
useNativeDecoding: true
headerMode: raw
startOffset: latest
partitioned: true
concurrency: 3
KStream Listener class
@StreamListener
@SendTo(MessagingStreams.OUTPUT)
public KStream<?, ?> process(@Input(MessagingStreams.INPUT) KStream<?, ?> kstreams) {
......
log.info("Got a message");
......
return kstreams;
}
My producer sends 100 messages in 1 run. But the logs seems to have only 1 thread StreamThread-1
handling the messages, though I have concurrency as 3. What might be wrong here ? Is 100 messages not enough to see the concurrency at play ?
2018-10-18 11:50:01.923 INFO 10228 --- [-StreamThread-1] c.c.c.s.KStreamHandler : Got a message
2018-10-18 11:50:01.923 INFO 10228 --- [-StreamThread-1] c.c.c.s.KStreamHandler : Got a message
2018-10-18 11:50:01.945 INFO 10228 --- [-StreamThread-1] c.c.c.s.KStreamHandler : Got a message
2018-10-18 11:50:01.956 INFO 10228 --- [-StreamThread-1] c.c.c.s.KStreamHandler : Got a message
2018-10-18 11:50:01.972 INFO 10228 --- [-StreamThread-1] c.c.c.s.KStreamHandler : Got a message
UPDATE:
As per the answer, the below num.stream.threads
configuration works at the binder level.
spring.cloud.stream.kafka.streams.binder.configuration:
num.stream.threads: 3