0
votes

I have installed docker confluentinc/cp-kafka-connect:4.0.0 image in my virtual machine. I am interested in getting the kafka topic which is a plain text data (string format) in hdfs in parquet format.

sudo docker run -d \
    --name=kafka-connect \
    --net=host \
    -e CONNECT_BOOTSTRAP_SERVERS=<kafka-server-host>:9092 \
    -e CONNECT_REST_PORT=8082 \
    -e CONNECT_GROUP_ID="connect-kafkac1" \
    -e CONNECT_CONFIG_STORAGE_TOPIC="connect-kafkac1-config" \
    -e CONNECT_OFFSET_STORAGE_TOPIC="connect-kafkac1-offsets" \
    -e CONNECT_STATUS_STORAGE_TOPIC="connect-kafkac1-status" \
    -e CONNECT_KEY_CONVERTER="org.apache.kafka.connect.json.JsonConverter" \
    -e CONNECT_VALUE_CONVERTER="org.apache.kafka.connect.json.JsonConverter" \
    -e CONNECT_INTERNAL_KEY_CONVERTER="org.apache.kafka.connect.json.JsonConverter" \
    -e CONNECT_INTERNAL_VALUE_CONVERTER="org.apache.kafka.connect.json.JsonConverter" \
    -e CONNECT_REST_ADVERTISED_HOST_NAME="localhost" \
    -e CONNECT_LOG4J_LOGGERS=org.reflections=ERROR \
    -e CONNECT_PLUGIN_PATH=/usr/share/java \
    confluentinc/cp-kafka-connect:4.0.0

I have the below configurations done.

/etc/kafka/connect-standalone.properties

bootstrap.servers=kafkaclusteraddress:9092
key.converter=org.apache.kafka.connect.storage.StringConverter
value.converter=org.apache.kafka.connect.storage.StringConverter
key.converter.schemas.enable=false
value.converter.schemas.enable=false
internal.key.converter=org.apache.kafka.connect.json.JsonConverter
internal.value.converter=org.apache.kafka.connect.json.JsonConverter
internal.key.converter.schemas.enable=false
internal.value.converter.schemas.enable=false

/etc/kafka-connect-hdfs/quickstart-hdfs.properties

name=hdfs-sink
connector.class=io.confluent.connect.hdfs.HdfsSinkConnector
tasks.max=1
topics=<topicname>
topics.dir=<hdfs-topic-dir>
logs.dir=<hdfs-logs-dir>
hdfs.url=<hdfs-url>:8020
format.class=io.confluent.connect.hdfs.parquet.ParquetFormat
flush.size=3
hadoop.conf.dir=/etc/hadoop/conf
hadoop.home=/usr/bin/hadoop

error i receive after running the config in standalone mode

# connect-standalone /etc/kafka/connect-standalone.properties /etc/kafka-connect-hdfs/quickstart-hdfs.properties

----------------------------------------

[2018-02-13 19:11:09,542] ERROR WorkerSinkTask{id=hdfs-sink-0} Task threw an uncaught and unrecoverable exception. Task is being killed and will not recover until manually restarted. (org.apache.kafka.connect.runtime.WorkerSinkTask)
java.lang.IllegalArgumentException: Avro schema must be a record.
        at org.apache.parquet.avro.AvroSchemaConverter.convert(AvroSchemaConverter.java:113)
        at org.apache.parquet.avro.AvroParquetWriter.writeSupport(AvroParquetWriter.java:144)
        at org.apache.parquet.avro.AvroParquetWriter.<init>(AvroParquetWriter.java:106)
        at io.confluent.connect.hdfs.parquet.ParquetRecordWriterProvider$1.write(ParquetRecordWriterProvider.java:68)
        at io.confluent.connect.hdfs.TopicPartitionWriter.writeRecord(TopicPartitionWriter.java:635)
        at io.confluent.connect.hdfs.TopicPartitionWriter.write(TopicPartitionWriter.java:379)
        at io.confluent.connect.hdfs.DataWriter.write(DataWriter.java:374)
        at io.confluent.connect.hdfs.HdfsSinkTask.put(HdfsSinkTask.java:101)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:495)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:288)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:198)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:166)
        at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:170)
        at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:214)
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
        at java.util.concurrent.FutureTask.run(FutureTask.java:266)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        at java.lang.Thread.run(Thread.java:745)
[2018-02-13 19:11:09,545] ERROR WorkerSinkTask{id=hdfs-sink-0} Task threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask)
org.apache.kafka.connect.errors.ConnectException: Exiting WorkerSinkTask due to unrecoverable exception.
        at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:517)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:288)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:198)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:166)
        at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:170)
        at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:214)
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
        at java.util.concurrent.FutureTask.run(FutureTask.java:266)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.IllegalArgumentException: Avro schema must be a record.
        at org.apache.parquet.avro.AvroSchemaConverter.convert(AvroSchemaConverter.java:113)
        at org.apache.parquet.avro.AvroParquetWriter.writeSupport(AvroParquetWriter.java:144)
        at org.apache.parquet.avro.AvroParquetWriter.<init>(AvroParquetWriter.java:106)
        at io.confluent.connect.hdfs.parquet.ParquetRecordWriterProvider$1.write(ParquetRecordWriterProvider.java:68)
        at io.confluent.connect.hdfs.TopicPartitionWriter.writeRecord(TopicPartitionWriter.java:635)
        at io.confluent.connect.hdfs.TopicPartitionWriter.write(TopicPartitionWriter.java:379)
        at io.confluent.connect.hdfs.DataWriter.write(DataWriter.java:374)
        at io.confluent.connect.hdfs.HdfsSinkTask.put(HdfsSinkTask.java:101)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:495)
        ... 10 more
[2018-02-13 19:11:09,545] ERROR WorkerSinkTask{id=hdfs-sink-0} Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.WorkerTask)
1

1 Answers

0
votes

If you look at the stacktrace, it is expecting Avro data, not Strings.

IllegalArgumentException: Avro schema must be a record.
        at org.apache.parquet.avro.AvroSchemaConverter.convert(AvroSchemaConverter.java:113)
        at org.apache.parquet.avro.AvroParquetWriter.writeSupport(AvroParquetWriter.java:144)
        at org.apache.parquet.avro.AvroParquetWriter.<init>(AvroParquetWriter.java:106)

I've not personally tested JSON to Parquet, but as a columnar datatype, ParquetFormat requires that you have a schema. Since you have schemas.enable=false, that won't happen, and so it cannot consume plain string, ints, booleans, etc and other "non-structured" Connect API types.

These are the settings that need altered.

key.converter=org.apache.kafka.connect.storage.StringConverter
value.converter=org.apache.kafka.connect.storage.StringConverter
key.converter.schemas.enable=false
value.converter.schemas.enable=false

In order to set the JSONConverter to have schemas.enable=true, then you must produce data that looks like {"schema": {...}, "payload": {...}}, where, the schema field contains the type definitions of the object within the payload field

Otherwise, if you have control over the producer code, you should send Avro data instead, which will require using the Schema Registry.


I would like to mention that since you have just a string record, then Parquet won't have much benefits over a raw text file on HDFS since there will only be a single column anyway.