I'm writing a proof of concept application to consume messages from Apache Kafka 0.9.0.0 and see if I can use it instead of a common JMS message broker because of the benefits Kafka provides. This is my base code, using the new consumer API:
public class Main implements Runnable {
public static final long DEFAULT_POLL_TIME = 300;
public static final String DEFAULT_GROUP_ID = "ltmjTest";
volatile boolean keepRunning = true;
private KafkaConsumer<String, Object> consumer;
private String servers;
private String groupId = DEFAULT_GROUP_ID;
private long pollTime = DEFAULT_POLL_TIME;
private String[] topics;
public Main() {
}
//getters and setters...
public void createConsumer() {
Map<String, Object> configs = new HashMap<>();
configs.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, servers);
configs.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
configs.put("enable.auto.commit", "true");
configs.put("auto.commit.interval.ms", "1000");
configs.put("session.timeout.ms", "30000");
configs.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
configs.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
consumer = new KafkaConsumer<>(configs);
consumer.subscribe(asList(topics));
}
public static void main(String[] args) {
Main main = new Main();
if (args != null && args.length > 0) {
for (String arg : args) {
String[] realArg = arg.trim().split("=", 2);
String argKey = realArg[0].toLowerCase();
String argValue = realArg[1];
switch (argKey) {
case "polltime":
main.setPollTime(Long.parseLong(argValue));
break;
case "groupid":
main.setGroupId(argValue);
break;
case "servers":
main.setServers(argValue);
break;
case "topics":
main.setTopics(argValue.split(","));
break;
}
}
main.createConsumer();
new Thread(main).start();
try (Scanner scanner = new Scanner(System.in)) {
while(true) {
String line = scanner.nextLine();
if (line.equals("stop")) {
main.setKeepRunning(false);
break;
}
}
}
}
}
I've started a kafka server using default settings and a kafka producer using the shell tool kafka-console-producer.sh
to write messages to my topic. Then I connect with two consumers using this code, sending the proper server to connect and topic to subscribe, everything else with default values, which means both consumers have the same group id. I notice that only one of my consumers consumes all the data. I've read that the default behaviour should be that the consumers must be balanced by the server, from the official tutorial:
If all the consumer instances have the same consumer group, then this works just like a traditional queue balancing load over the consumers.
How can I fix the consumers to behave like the default? Or maybe I'm missing something?