The configuration for the Consumer is as below. Sometimes for a unique id, the consumer group is not created at all. I am trying to consume the messages as per the application name. Even the script for the consumer group does not show that particular consumer group in the list. for eg the group id given application8 is not created at all I receive in logs is as below.
2019-11-14 14:09:27,719 INFO - Kafka version: 2.3.1 2019-11-14 14:09:27,719 INFO - Kafka commitId: 18a913733fb71c01 2019-11-14 14:09:27,719 INFO - Kafka startTimeMs: 1573720767718 2019-11-14 14:09:27,720 INFO - [Consumer clientId=consumer-1, groupId=Application8] Subscribed to topic(s): config 2019-11-14 14:09:27,955 INFO - [Consumer clientId=consumer-1, groupId=Application8] Cluster ID: h1TJ0oMkQYqO0z8ftlIzpA
public static void KafkaServerStart() throws IOException {
Properties props = new Properties();
props.put("bootstrap.servers", "192.168.0.134:9092");
props.put("group.id", "Application8");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer");
props.put("partition.assignment.strategy", "org.apache.kafka.clients.consumer.RoundRobinAssignor");
props.put("enable.auto.commit", "true");
props.put("heartbeat.interval.ms", "3000");
props.put("session.timeout.ms", "9000");
props.put("auto.offset.reset","latest");
consumer = new KafkaConsumer<String, byte[]>(props);
consumer.subscribe(Collections.singletonList("config"), new RebalanceConfigListener());
final Thread mainThread = Thread.currentThread();
// Registering a shutdown hook so we can exit cleanly
Runtime.getRuntime().addShutdownHook(new Thread() {
public void run() {
System.out.println("Starting exit...");
// KafkaConsumers.consumer.commitSync(KafkaConsumers.currentOffsets);
// Note that shutdownhook runs in a separate thread, so the only thing we can
// safely do to a consumer is wake it up
consumer.wakeup();
try {
mainThread.join();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
try {
while (true) {
ConsumerRecords<String, byte[]> records = consumer.poll(Duration.ofMillis(100));
boolean commit = false;
for (ConsumerRecord<String, byte[]> record : records) {
/**
* Code for committing the offset on every iteration. Start.
*/
if (!commit)
commit = true;
/**
* Code for committing the offset on every iteration. End.
*/
// LiveDa.processData(record.key(), record.value(), record.offset(),
// record.partition());
Reinit.reInitMethod(new String(record.value()));
/*
* System.out.println("Key of the data " + record.key() + " ,values " + new
* String(record.value()) + " ,offset is " + record.offset() +
* " ,Partition ID " + record.partition());
*/
/**
* Code for committing the offset on every iteration. Start.
*/
currentOffsets.put(new TopicPartition(record.topic(), record.partition()),
new OffsetAndMetadata(record.offset() + 1, "no metadata"));
/**
* Code for committing the offset on every iteration. End.
*/
}
/**
* Code for committing the offset on every iteration. Start.
*/
if (commit)
consumer.commitAsync(currentOffsets, null);
/**
* Code for committing the offset on every iteration. End.
*/
}
} catch (Exception e) {
e.printStackTrace();
} finally {
// write logic on shutdown.
System.out.println("EXITING KAFKA");
/**
* Code for committing the offset on every iteration. Start.
*/
consumer.commitSync(currentOffsets);
/**
* Code for committing the offset on every iteration. End.
*/
consumer.close();
}
}
public static void main(String[] args) {
try {
KafkaConfigConsumer.KafkaServerStart();
} catch (IOException e) {
e.printStackTrace();
}
}
@Override
public void run() {
try {
KafkaConfigConsumer.KafkaServerStart();
} catch (IOException e) {
SystemLogger.error(e);
}
}