3
votes

I have the following code

//Kafka Config setup
Properties props = ...; //setup

List<String> topicList = Arrays.asList({"A", "B", "C"});

StreamBuilder builder = new StreamBuilder();
KStream<String, String> source = builder.stream(topicList);

source
.map((k,v) -> { //busy code for mapping data})
.transformValues(new MyGenericTransformer());
.to((k,v,r) -> {//busy code for topic routing});

new KafkaStream(builder.build(), properties).start();

My Problem : When I add more than one topic to subscribe to (ie A,B,C in above) the Kstream code stops receiving records.

References : https://kafka.apache.org/10/javadoc/org/apache/kafka/streams/StreamsBuilder.html

Relevant Documentation

public <K,V> KStream<K,V> stream(java.util.Collection<java.lang.String> topics)

"If multiple topics are specified there is no ordering guarantee for records from different topics."

What I'm trying to achieve : Have one Kstream (ie 'source' from above) consume/process from multiple topics.

1
do you see anything in the logs? have you enabled debug log? also, topics need to be co-partitioned. meaning that num of partitions for each topic, key and partitioning algorithm should be the same.Levani Kokhreidze
You code looks correct. I would look into log4j output -- did the application rebalance correctly (ie, tasks assigned etc). Do you monitor the lag for the input topics?Matthias J. Sax

1 Answers

2
votes

Do the topics share the same key?

Note that the specified input topics must be partitioned by key. If this is not the case it is the user's responsibility to repartition the data before any key based operation (like aggregation or join) is applied to the returned KStream.

this maybe your blocker.

Another possible issue maybe the consumer group used.