2
votes

I want to use kafka-connect-hdfs for writing schemaless json records from kafka to hdfs file. If I am using JsonConvertor as key/value convertor then it is not working. But if I am using StringConvertor then it writes the json as escaped string.

For example:

actual json -

{"name":"test"}

data written to hdfs file -

"{\"name\":\"test\"}"

expected output to hdfs file -

{"name":"test"}

Is there any way or alternative I can achieve this or I have to use it with schema only?

Below is the exception I get when I try to use JSONConvertor:

[2017-09-06 14:40:19,344] ERROR Task hdfs-sink-0 threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask:148)
org.apache.kafka.connect.errors.DataException: JsonConverter with schemas.enable requires "schema" and "payload" fields and may not contain additional fields. If you are trying to deserialize plain JSON data, set schemas.enable=false in your converter configuration.
    at org.apache.kafka.connect.json.JsonConverter.toConnectData(JsonConverter.java:308)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:406)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:250)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:180)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:148)
    at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:146)
    at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:190)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)

Configuration of quickstart-hdfs.properties:

name=hdfs-sink
connector.class=io.confluent.connect.hdfs.HdfsSinkConnector
tasks.max=1
topics=test_hdfs_avro
hdfs.url=hdfs://localhost:9000
flush.size=1
key.converter=org.apache.kafka.connect.storage.StringConverter
value.converter=org.apache.kafka.connect.json.JsonConverter

Configuration of connect-avro-standalone.properties:

bootstrap.servers=localhost:9092
schemas.enable=false
key.converter.schemas.enable=false
value.converter.schemas.enable=false
1
"If I am using JsonConvertor as key/value convertor then it is not working." -> can you describe the issue you're seeing? the error you get?Robin Moffatt
@RobinMoffatt I have added the exception and properties I have configured for this connector.Nandish Kotadia

1 Answers

1
votes

When you specify a converter in you connector's configuration properties you need to include all the properties pertaining to this converter, regardless of whether such properties are included in the worker's config too.

In the above example, you'll need to specify both:

value.converter=org.apache.kafka.connect.json.JsonConverter
value.converter.schemas.enable=false

in quickstart-hdfs.properties.

FYI, JSON export is coming up in the HDFS Connector soon. Track the related pull request here: https://github.com/confluentinc/kafka-connect-hdfs/pull/196

Update: JsonFormat has been merged to master branch.