0
votes

I'm trying to use Kafka connect sink to write files from Kafka to HDFS.

My properties looks like:

connector.class=io.confluent.connect.hdfs.HdfsSinkConnector
flush.size=3
format.class=io.confluent.connect.hdfs.parquet.ParquetFormat
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
schema.compatability=BACKWARD
key.converter.schemas.enabled=false
value.converter.schemas.enabled=false
schemas.enable=false

And When I'm trying to run the connector I got the following exception:

org.apache.kafka.connect.errors.DataException: JsonConverter with schemas.enable requires "schema" and "payload" fields and may not contain additional fields. If you are trying to deserialize plain JSON data, set schemas.enable=false in your converter configuration.

I'm using Confluent version 4.0.0.

Any suggestions please?

1
@cricket_007, If my Json isn't with "schema" and "payload", How can I write Parquet file anyway?Ya Ko
I don't think you can. Parquet requires a Schema and last time I checked, the Kafka Connect code from Confluent uses the Avro libraries to help convert the Kafka message into Parquet filesOneCricketeer
You would need to produce Avro into the topic to begin with, using the Schema Registry. Otherwise, you must add the schema field to the JSON message. Alternatively, use JSONFormat rather than Parquet, then use Hive, Spark, whatever to convert to Parquet later. In any option you choose, the schema needs defined, but that's not a property that is added in the Connect frameworkOneCricketeer
Sounds like you got it. There's more options like using Kafka Streams or KSQL to convert the JSON topic into an Avro topic, and then using Connect, but that assumes that you cannot change the producer code and are able to reliably deploy those servicesOneCricketeer
I haven't tried it, but that's what the error is trying to tell youOneCricketeer

1 Answers

0
votes

My understanding of this issue is that if you set schemas.enable=true, you tell kafka that you would like to include the schema into messages that kafka must transfer. In this case, a kafka message does not have a plain json format. Instead, it first describes the schema and then attaches the payload (i.e., the actual data) that corresponds to the schema (read about AVRO formatting). And this leads to the conflict: On the one hand you've specified JsonConverter for your data, on the other hand you ask kafka to include the schema into messages. To fix this, you can either use AvroConverter with schemas.enable = true or JsonCOnverter with schemas.enable=false.