0
votes

I'm trying to write JSON with Kafka HDFS Sink.

I have the following properties (connect-standalone.properties):

key.converter.schemas.enable = false
value.converter.schemas.enable = false
schemas.enable=false
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter

And on my properties:

format.class=io.confluent.connect.hdfs.json.JsonFormat

And I got the following exception:

org.apache.kafka.connect.errors.DataException: Converting byte[] to Kafka connect failed due to serilization error

... Caused By: org.apache.kafka.commom.errors.SerlizationException: com.fasterxml.jackson.core.JsonParseException: Unrecognized token 'test' : was expecting 'null', 'true', 'false' or NaN at [Source: (byte[])"test" line: 1 column: 11]

My JSON is valid.

How can I solve it please?

*I'm trying also with sample JSON like:

{"key":"value"}

And still same error.

Thanks.

1
The data in your topic needs to be JSON. It looks like one message in the topic is only "test"OneCricketeer
Can you show the both connect property files and the output of the console consumer from the beginning of the topic?OneCricketeer

1 Answers

1
votes

According to the error, not all the messages in the topic are actually JSON objects. The latest messages might be valid, or the Kafka values might be valid (not the keys, though), but the error shows that it tried to read a plain string, (byte[])"test", which is not valid

If you only want text data into HDFS, you could use String format, however that won't have Hive integration

format.class=io.confluent.connect.hdfs.string.StringFormat

If you did want to use Hive with this format, you would need to define the JSON Serde yourself