1
votes

I have two broker 1.0.0 kafka cluster and I am running 1.0.0 kafka stream API application against this kafka.I increased the producer request.timeout.ms to 5 minutes to fix producer Timeoutexception.

Currently I am getting below two types of exceptions after running some time. I am trying to fix these exceptions as suggested in Apache Kafka: TimeoutException and then nothing works‏ But incomplete solution was here. Is this solution recommendable (decreasing producer batch.size). Please help. Exception 1

2017-12-08 13:11:55,129 ERROR o.a.k.s.p.i.RecordCollectorImpl [sample-app-0.0.1-156ec0d4-6d7c-40b0-a493-370f8d9a092c-StreamThread-1] task [2_0] Error sending record (key 5a12c529e532af0b84f5d937 value com.kafka.streams.SampleEvent@54a6900d timestamp 1512536799387) to topic abc due to org.apache.kafka.common.errors.TimeoutException: Failed to allocate memory within the configured max blocking time 60000 ms.; No more records will be sent and no more offsets will be recorded for this task.
2017-12-08 13:11:55,131 ERROR o.a.k.s.p.i.AssignedTasks [sample-app-0.0.1-156ec0d4-6d7c-40b0-a493-370f8d9a092c-StreamThread-1] stream-thread [sample-app-0.0.1-156ec0d4-6d7c-40b0-a493-370f8d9a092c-StreamThread-1] Failed to process stream task 2_0 due to the following error: org.apache.kafka.streams.errors.StreamsException: Exception caught in process. taskId=2_0, processor=KSTREAM-SOURCE-0000000004, topic=Sample-Event, partition=0, offset=508417
        at org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:232)
        at org.apache.kafka.streams.processor.internals.AssignedTasks.process(AssignedTasks.java:403)
        at org.apache.kafka.streams.processor.internals.TaskManager.process(TaskManager.java:317)
        at org.apache.kafka.streams.processor.internals.StreamThread.processAndMaybeCommit(StreamThread.java:942)
        at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:822)
        at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:774)
        at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:744)
Caused by: org.apache.kafka.streams.errors.StreamsException: task [2_0] Abort sending since an error caught with a previous record (key 5a12c529e532af0b84f5d937 value com.kafka.streams.SampleEvent@54a6900d timestamp 1512536799387) to topic abc due to org.apache.kafka.common.errors.TimeoutException: Failed to allocate memory within the configured max blocking time 60000 ms..
        at org.apache.kafka.streams.processor.internals.RecordCollectorImpl$1.onCompletion(RecordCollectorImpl.java:118)
        at org.apache.kafka.clients.producer.KafkaProducer.doSend(KafkaProducer.java:819)
        at org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:760)
        at org.apache.kafka.streams.processor.internals.RecordCollectorImpl.send(RecordCollectorImpl.java:100)
        at org.apache.kafka.streams.processor.internals.RecordCollectorImpl.send(RecordCollectorImpl.java:78)
        at org.apache.kafka.streams.processor.internals.SinkNode.process(SinkNode.java:87)
        at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:85)
        at org.apache.kafka.streams.kstream.internals.KStreamTransform$KStreamTransformProcessor.process(KStreamTransform.java:56)
        at org.apache.kafka.streams.processor.internals.ProcessorNode$1.run(ProcessorNode.java:46)
        at org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:208)
        at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:124)
        at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:85)
        at org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:80)
        at org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:216)
        ... 6 more
Caused by: org.apache.kafka.common.errors.TimeoutException: Failed to allocate memory within the configured max blocking time 60000 ms.

Exception 2

2017-12-11 11:08:35,257 ERROR o.a.k.s.p.i.RecordCollectorImpl [kafka-producer-network-thread | sample-app-0.0.1-030b5133-df00-4abd-a3de-8bfab114f626-StreamThread-1-producer] task [2_0] Error sending record (key 5a12c529e532af0b84f5d937 value com.kafka.streams.SampleEvent@1758de61 timestamp 1512795449471) to topic abc due to org.apache.kafka.common.errors.TimeoutException: Expiring 14 record(s) for abc-0: 122597 ms has passed since last append; No more records will be sent and no more offsets will be recorded for this task.
2017-12-11 11:08:56,001 ERROR o.a.k.s.p.i.AssignedTasks [sample-app-0.0.1-030b5133-df00-4abd-a3de-8bfab114f626-StreamThread-1] stream-thread [sample-app-0.0.1-030b5133-df00-4abd-a3de-8bfab114f626-StreamThread-1] Failed to commit stream task 2_0 due to the following error: org.apache.kafka.streams.errors.StreamsException: task [2_0] Abort sending since an error caught with a previous record (key 5a12c529e532af0b84f5d937 value com.kafka.streams.SampleEvent@1758de61 timestamp 1512795449471) to topic abc due to org.apache.kafka.common.errors.TimeoutException: Expiring 14 record(s) for abc-0: 122597 ms has passed since last append.
        at org.apache.kafka.streams.processor.internals.RecordCollectorImpl$1.onCompletion(RecordCollectorImpl.java:118)
        at org.apache.kafka.clients.producer.internals.ProducerBatch.completeFutureAndFireCallbacks(ProducerBatch.java:204)
        at org.apache.kafka.clients.producer.internals.ProducerBatch.done(ProducerBatch.java:187)
        at org.apache.kafka.clients.producer.internals.Sender.failBatch(Sender.java:627)
        at org.apache.kafka.clients.producer.internals.Sender.sendProducerData(Sender.java:287)
        at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:238)
        at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:163)
        at java.lang.Thread.run(Thread.java:745)
Caused by: org.apache.kafka.common.errors.TimeoutException: Expiring 14 record(s) for abc-0: 122597 ms has passed since last append
4
You can increase max.block.ms to get larger timeout for the first exception.Matthias J. Sax

4 Answers

1
votes

We faced similar issue which we resolved First issue by setting: max.block.ms to something higher than currently configured value.

Second issue by: increasing the batch.size and decreasing the linger.ms (might increase latency) on Kafka Producer side. Increasing batch.size would send more batches with fewer messages in each batch.

0
votes

This looks something that often happens when an expected topic hasn't been created. Try looking further back in the log files.

You can also explicity use the admin client to check which topics exist.

0
votes

first issue is due to this reason: ( Producer sends continuous heartbeat and it will wait for 60,000 ms (default value) for the metadata. If metadata isn't present in the specified time, it throws streams timeout exception. To fix this, add kafka producer config (ProducerConfig.MAX_BLOCK_MS_CONFIG) to some value grater that 60000ms. This will resolve the issue.

0
votes

if you are using nifi and kafka with SASL_SSL without kerberos and you are providing kafka client jaas then increase the Metadata wait time to 100sec and Acknowledgment wait time to 100 sec this will work for you.