2
votes

By using kafka connector i'm writing the avro formatted data into kafka topic and then by using kafka streams i'm mapping some values and writing the output to some other topic by using:

Stream.to("output_topic");

My data is writing in to the output topic, but i'm facing issue with offset. If i have 25 records in my input topic, It is writing all the 25 records into my output topic but throwing an error as:

[2018-06-25 12:42:50,243] ERROR [ConsumerFetcher consumerId=console-consumer-3500_kafka-connector-1529910768088-712e7106,leaderId=0, fetcherId=0]Error due to(kafka.consumer.ConsumerFetcherThread)

kafka.common.KafkaException: Error processing data for partition Stream-0 offset 25

Here is my full error:

> [2018-06-25 12:42:50,243] ERROR [ConsumerFetcher
> consumerId=console-consumer-3500_kafka-connector-1529910768088-712e7106,
> leaderId=0, fetcherId=0] Error due to
> (kafka.consumer.ConsumerFetcherThread) kafka.common.KafkaException:
> Error processing data for partition Stream-0 offset 25    at
> kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2$$anonfun$apply$mcV$sp$1$$anonfun$apply$2.apply(AbstractFetcherThread.scala:204)
>   at
> kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2$$anonfun$apply$mcV$sp$1$$anonfun$apply$2.apply(AbstractFetcherThread.scala:169)
>   at scala.Option.foreach(Option.scala:257)   at
> kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2$$anonfun$apply$mcV$sp$1.apply(AbstractFetcherThread.scala:169)
>   at
> kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2$$anonfun$apply$mcV$sp$1.apply(AbstractFetcherThread.scala:166)
>   at scala.collection.Iterator$class.foreach(Iterator.scala:891)  at
> scala.collection.AbstractIterator.foreach(Iterator.scala:1334)    at
> scala.collection.IterableLike$class.foreach(IterableLike.scala:72)    at
> scala.collection.AbstractIterable.foreach(Iterable.scala:54)  at
> kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2.apply$mcV$sp(AbstractFetcherThread.scala:166)
>   at
> kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2.apply(AbstractFetcherThread.scala:166)
>   at
> kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2.apply(AbstractFetcherThread.scala:166)
>   at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:250)   at
> kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:164)
>   at
> kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:111)
>   at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:82)

> Caused by: java.lang.IllegalArgumentException: Illegal batch type
> class org.apache.kafka.common.record.DefaultRecordBatch. The older
> message format classes only support conversion from class
> org.apache.kafka.common.record.AbstractLegacyRecordBatch, which is
> used for magic v0 and v1  at
> kafka.message.MessageAndOffset$.fromRecordBatch(MessageAndOffset.scala:29)
>   at
> kafka.message.ByteBufferMessageSet$$anonfun$internalIterator$1.apply(ByteBufferMessageSet.scala:169)
>   at
> kafka.message.ByteBufferMessageSet$$anonfun$internalIterator$1.apply(ByteBufferMessageSet.scala:169)
>   at scala.collection.Iterator$$anon$11.next(Iterator.scala:410)  at
> scala.collection.Iterator$class.toStream(Iterator.scala:1320)     at
> scala.collection.AbstractIterator.toStream(Iterator.scala:1334)   at
> scala.collection.TraversableOnce$class.toSeq(TraversableOnce.scala:298)
>   at scala.collection.AbstractIterator.toSeq(Iterator.scala:1334)     at
> kafka.consumer.PartitionTopicInfo.enqueue(PartitionTopicInfo.scala:59)
>   at
> kafka.consumer.ConsumerFetcherThread.processPartitionData(ConsumerFetcherThread.scala:87)
>   at
> kafka.consumer.ConsumerFetcherThread.processPartitionData(ConsumerFetcherThread.scala:37)
>   at
> kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2$$anonfun$apply$mcV$sp$1$$anonfun$apply$2.apply(AbstractFetcherThread.scala:183)
>   ... 15 more
1
Sound like a version miss-match... What is your broker, message format, Connect, Kafka Streams versions?Matthias J. Sax
broker - confluent-4.1.0, message format - Avro, connect - kafka-connect-jdbc, kafka Stream - 1.0.0-cp1Sujitha Chinnu
By "message format" I did not mean your data type (that seems to be Avro), but the Kafka message format: kafka.apache.org/documentation/#messageformat There is a config message.format.version and log.message.format.version. Also, did you upgrad your brokers at some point and work with topic that were create before the upgrade?Matthias J. Sax

1 Answers

0
votes

I got same error while using kafka-consumer-console.sh

The problem was that —zookeeper option. If you give —zookeeper option, old consumer is started by default and the magic option will be set as default v0 or v1(current kafka version 1.1 use v2) That’s why the miss-match of version occurs.

You can solve this error by using —bootstrap-server option rather than —zookeeper.(It means run new version of consumer)

When you give —bootstrap-server option, there must be broker’s domain(or ip) and port number. e.g.) —bootstrap-server kafka.domain:9092,kafka2.domain:9092

Broker (Kafka server) default port is 9092 and you can change the port in kafka/config/server.properties.