I want to create single kafka consumer for several topics. Method constructor for consumer allows me to transfer arguments for a list of topics inside subscription, like that:
private Consumer createConsumer() {
Properties props = getConsumerProps();
Consumer<String, byte[]> consumer = new KafkaConsumer<>(props);
ArrayList<String> topicMISL = new ArrayList<>();
for (String s:Connect2Redshift.kafkaTopics) {
topicMISL.add(systemID + "." + s);
}
consumer.subscribe(topicMISL);
return consumer;
}
private boolean consumeMessages( Duration duration, Consumer<String, byte[]> consumer) {
try { Long start = System.currentTimeMillis();
ConsumerRecords<String, byte[]> consumerRecords = consumer.poll(duration);
}
}
Afterwards I want to poll records from kafka into stream every 3 sec and process them, but I wonder what is inside this consumer - how will records from different topics be polled - at first one topic, then another, or in parallel. Could it be that one topic with large amount of messages would be processed all the time and another topic with small amount of messages would wait?