7
votes

After enabling exactly once processing on a Kafka streams application, the following error appears in the logs:

ERROR o.a.k.s.p.internals.StreamTask - task [0_0] Failed to close producer 
due to the following error:

org.apache.kafka.streams.errors.StreamsException: task [0_0] Abort 
sending since an error caught with a previous record (key 222222 value 
some-value timestamp 1519200902670) to topic exactly-once-test-topic- 
v2 due to This exception is raised by the broker if it could not 
locate the producer metadata associated with the producerId in 
question. This could happen if, for instance, the producer's records 
were deleted because their retention time had elapsed. Once the last 
records of the producerId are removed, the producer's metadata is 
removed from the broker, and future appends by the producer will 
return this exception.
  at org.apache.kafka.streams.processor.internals.RecordCollectorImpl.recordSendError(RecordCollectorImpl.java:125)
  at org.apache.kafka.streams.processor.internals.RecordCollectorImpl.access$500(RecordCollectorImpl.java:48)
  at org.apache.kafka.streams.processor.internals.RecordCollectorImpl$1.onCompletion(RecordCollectorImpl.java:180)
  at org.apache.kafka.clients.producer.KafkaProducer$InterceptorCallback.onCompletion(KafkaProducer.java:1199)
  at org.apache.kafka.clients.producer.internals.ProducerBatch.completeFutureAndFireCallbacks(ProducerBatch.java:204)
  at org.apache.kafka.clients.producer.internals.ProducerBatch.done(ProducerBatch.java:187)
  at org.apache.kafka.clients.producer.internals.Sender.failBatch(Sender.java:627)
  at org.apache.kafka.clients.producer.internals.Sender.failBatch(Sender.java:596)
  at org.apache.kafka.clients.producer.internals.Sender.completeBatch(Sender.java:557)
  at org.apache.kafka.clients.producer.internals.Sender.handleProduceResponse(Sender.java:481)
  at org.apache.kafka.clients.producer.internals.Sender.access$100(Sender.java:74)
  at org.apache.kafka.clients.producer.internals.Sender$1.onComplete(Sender.java:692)
  at org.apache.kafka.clients.ClientResponse.onComplete(ClientResponse.java:101)
  at org.apache.kafka.clients.NetworkClient.completeResponses(NetworkClient.java:482)
  at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:474)
  at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:239)
  at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:163)
  at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.kafka.common.errors.UnknownProducerIdException

We've reproduced the issue with a minimal test case where we move messages from a source stream to another stream without any transformation. The source stream contains millions of messages produced over several months. The KafkaStreams object is created with the following StreamsConfig:

  • StreamsConfig.PROCESSING_GUARANTEE_CONFIG = "exactly_once"
  • StreamsConfig.APPLICATION_ID_CONFIG = "Some app id"
  • StreamsConfig.NUM_STREAM_THREADS_CONFIG = 1
  • ProducerConfig.BATCH_SIZE_CONFIG = 102400

The app is able to process some messages before the exception occurs.

Context information:

  • we're running a 5 node Kafka 1.1.0 cluster with 5 zookeeper nodes.
  • there are multiple instances of the app running

Has anyone seen this problem before or can give us any hints about what might be causing this behaviour?

Update

We created a new 1.1.0 cluster from scratch and started to process new messages without problems. However, when we imported old messages from the old cluster, we hit the same UnknownProducerIdException after a while.

Next we tried to set the cleanup.policy on the sink topic to compact while keeping the retention.ms at 3 years. Now the error did not occur. However, messages seem to have been lost. The source offset is 106 million and the sink offset is 100 million.

1
The producer ID is store directly in the log -- thus, if all your data gets deleted, it could happen that the PID is lost. What is your retention time and what timestamps do your data have?Matthias J. Sax
The retention time of the source topic and destination topic is set to 3 years. What do you mean with the producer ID being stored in the log?Odinodin
To be specific, we have set retention.ms to 3 years and cleanup.policy to deleteOdinodin
Are message timestamps older than retention time? Producer ID of the writing producer is store for each message and the log is used as "source of truth" what PIDs are known (PIDs are not store again somewhere else). Might also be a bug on the broker of the producer... Not sure.Matthias J. Sax

1 Answers

1
votes

As explained in the comments, there currently seems to be a bug that may cause problems when replaying messages older than the (maximum configurable?) retention time.

At time of writing this is unresolved, the latest status can always be seen here:

https://issues.apache.org/jira/browse/KAFKA-6817