I have a simple Spring boot service that is called on-demand and consumes specified number of messages from the topic. Number of messages to consume is passed as a parameter. Service is being called every 30 minutes. Each message size is ~1.6 kb. I always get around 1100 or 1200 messages every-time. Have one topic with one partition only and REST service is the only consumer. Here is how the service is called http://example.com/messages?limit=2000
private OutputResponse getNewMessages(String limit) throws Exception {
System.out.println("***** START *****");
final long start = System.nanoTime();
int loopCntr = 0;
int counter = 0;
OutputResponse outputResponse = new OutputResponse();
Output output = new Output();
List<Output> rspListObject = new ArrayList<>();
Consumer<Object, Object> consumer = null;
String result = null;
try {
Properties p = new Properties();
p.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, "180000");
p.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, limit);
consumer = consumerFactory.createConsumer("my-group-id", null, null, p);
consumer.assign(Collections.singleton(new TopicPartition("test-topic", 0)));
while (loopCntr < 2) {
loopCntr++;
ConsumerRecords<Object, Object> consumerRecords = consumer.poll(Duration.ofSeconds(15));
for (ConsumerRecord<Object, Object> record : consumerRecords)
{
counter++;
try
{
//get json string
result = mapper.writeValueAsString(record.value());
//to json
output = mapper.readValue(result, Output.class);
rspListObject.add(output);
} catch (Exception e) {
logger.error(e);
insertToDB(record.value(),record.offset());
}
}
}
outputResponse.setObjects(rspListObject);
final long end = System.nanoTime();
System.out.println("Took: " + ((end - start) / 1000000) + "ms");
System.out.println("Took: " + (end - start) / 1000000000 + " seconds");
// commit the offset of records to broker
if (counter > 0) {
consumer.commitSync();
}
} finally {
try {
System.out.println(" >>>>> closing the consumer");
if (consumer != null)
consumer.close();
}catch(Exception e){
//log message
}
}
return outputResponse;
}
this is what I have in application.yml
spring:
kafka:
consumer:
enable-auto-commit: false
auto-offset-reset: latest
key-deserializer: org.springframework.kafka.support.serializer.ErrorHandlingDeserializer
value-deserializer: org.springframework.kafka.support.serializer.ErrorHandlingDeserializer
properties:
spring.deserializer.key.delegate.class: org.apache.kafka.common.serialization.StringDeserializer
spring.deserializer.value.delegate.class: org.springframework.kafka.support.serializer.JsonDeserializer
spring.json.trusted.packages: '*'
max.poll.interval.ms: 300000
group-id: my-group-id
ConsumerConfig values:
allow.auto.create.topics = true
auto.commit.interval.ms = 5000
auto.offset.reset = latest
check.crcs = true
client.dns.lookup = default
client.id =
client.rack =
connections.max.idle.ms = 540000
default.api.timeout.ms = 60000
enable.auto.commit = false
exclude.internal.topics = true
fetch.max.bytes = 52428800
fetch.max.wait.ms = 500
fetch.min.bytes = 1
group.id = my-group-id
group.instance.id = null
heartbeat.interval.ms = 3000
interceptor.classes = []
internal.leave.group.on.close = true
isolation.level = read_uncommitted
key.deserializer = class org.springframework.kafka.support.serializer.ErrorHandlingDeserializer
max.partition.fetch.bytes = 1048576
max.poll.interval.ms = 180000
max.poll.records = 500
metadata.max.age.ms = 300000
metric.reporters = []
metrics.num.samples = 2
metrics.recording.level = INFO
metrics.sample.window.ms = 30000
partition.assignment.strategy = [class org.apache.kafka.clients.consumer.RangeAssignor]
receive.buffer.bytes = 65536
reconnect.backoff.max.ms = 1000
reconnect.backoff.ms = 50
request.timeout.ms = 30000
retry.backoff.ms = 100
This is an error I am getting at commitSync. Tried with consuming 5 messages when doing poll(), tried with setting p.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, "180000"); but same error
Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member. This means that the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms, which typically implies that the poll loop is spending too much time message processing. You can address this either by increasing max.poll.interval.ms or by reducing the maximum size of batches returned in poll() with max.poll.records.