By using kafka connector i'm writing the avro formatted data into kafka topic and then by using kafka streams i'm mapping some values and writing the output to some other topic by using:
Stream.to("output_topic");
My data is writing in to the output topic, but i'm facing issue with offset. If i have 25 records in my input topic, It is writing all the 25 records into my output topic but throwing an error as:
[2018-06-25 12:42:50,243] ERROR [ConsumerFetcher consumerId=console-consumer-3500_kafka-connector-1529910768088-712e7106,leaderId=0, fetcherId=0]Error due to(kafka.consumer.ConsumerFetcherThread) kafka.common.KafkaException: Error processing data for partition Stream-0 offset 25
Here is my full error:
> [2018-06-25 12:42:50,243] ERROR [ConsumerFetcher
> consumerId=console-consumer-3500_kafka-connector-1529910768088-712e7106,
> leaderId=0, fetcherId=0] Error due to
> (kafka.consumer.ConsumerFetcherThread) kafka.common.KafkaException:
> Error processing data for partition Stream-0 offset 25 at
> kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2$$anonfun$apply$mcV$sp$1$$anonfun$apply$2.apply(AbstractFetcherThread.scala:204)
> at
> kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2$$anonfun$apply$mcV$sp$1$$anonfun$apply$2.apply(AbstractFetcherThread.scala:169)
> at scala.Option.foreach(Option.scala:257) at
> kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2$$anonfun$apply$mcV$sp$1.apply(AbstractFetcherThread.scala:169)
> at
> kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2$$anonfun$apply$mcV$sp$1.apply(AbstractFetcherThread.scala:166)
> at scala.collection.Iterator$class.foreach(Iterator.scala:891) at
> scala.collection.AbstractIterator.foreach(Iterator.scala:1334) at
> scala.collection.IterableLike$class.foreach(IterableLike.scala:72) at
> scala.collection.AbstractIterable.foreach(Iterable.scala:54) at
> kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2.apply$mcV$sp(AbstractFetcherThread.scala:166)
> at
> kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2.apply(AbstractFetcherThread.scala:166)
> at
> kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2.apply(AbstractFetcherThread.scala:166)
> at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:250) at
> kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:164)
> at
> kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:111)
> at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:82)
> Caused by: java.lang.IllegalArgumentException: Illegal batch type
> class org.apache.kafka.common.record.DefaultRecordBatch. The older
> message format classes only support conversion from class
> org.apache.kafka.common.record.AbstractLegacyRecordBatch, which is
> used for magic v0 and v1 at
> kafka.message.MessageAndOffset$.fromRecordBatch(MessageAndOffset.scala:29)
> at
> kafka.message.ByteBufferMessageSet$$anonfun$internalIterator$1.apply(ByteBufferMessageSet.scala:169)
> at
> kafka.message.ByteBufferMessageSet$$anonfun$internalIterator$1.apply(ByteBufferMessageSet.scala:169)
> at scala.collection.Iterator$$anon$11.next(Iterator.scala:410) at
> scala.collection.Iterator$class.toStream(Iterator.scala:1320) at
> scala.collection.AbstractIterator.toStream(Iterator.scala:1334) at
> scala.collection.TraversableOnce$class.toSeq(TraversableOnce.scala:298)
> at scala.collection.AbstractIterator.toSeq(Iterator.scala:1334) at
> kafka.consumer.PartitionTopicInfo.enqueue(PartitionTopicInfo.scala:59)
> at
> kafka.consumer.ConsumerFetcherThread.processPartitionData(ConsumerFetcherThread.scala:87)
> at
> kafka.consumer.ConsumerFetcherThread.processPartitionData(ConsumerFetcherThread.scala:37)
> at
> kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2$$anonfun$apply$mcV$sp$1$$anonfun$apply$2.apply(AbstractFetcherThread.scala:183)
> ... 15 more
message.format.version
andlog.message.format.version
. Also, did you upgrad your brokers at some point and work with topic that were create before the upgrade? – Matthias J. Sax