1
votes

My currently working setup:

NiFi streams Avro messages (Confluent Schema Registry Reference) to Kafka(v2.0.0, 20 partitions, Confluent v5.0.0), Kafka Connect Worker (HDFS sink) streams these messages in Parquet format to HDFS with flush.size=70000.

My problem:

This configuration works fine, but when I change config to flush.size=1000000 (because 70k messages size is 5-7 Mb maximum, but Parquet file block size is 256 Mb) connect worker returns Error sending fetch request errors:

...
[2019-05-24 14:00:21,784] INFO [ReplicaFetcher replicaId=1, leaderId=3, fetcherId=0] Error sending fetch request (sessionId=1661483807, epoch=374) to node 3: java.io.IOException: Connection to 3 was disconnected before the response was read. (org.apache.kafka.clients.FetchSessionHandler)
[2019-05-24 14:00:21,784] WARN [ReplicaFetcher replicaId=1, leaderId=3, fetcherId=0] Error in response for fetch request (type=FetchRequest, replicaId=1, maxWait=500, minBytes=1, maxBytes=10485760, fetchData={mytopic-10=(offset=27647797, logStartOffset=24913298, maxBytes=1048576), mytopic-16=(offset=27647472, logStartOffset=24913295, maxBytes=1048576), mytopic-7=(offset=27647429, logStartOffset=24913298, maxBytes=1048576), mytopic-4=(offset=27646967, logStartOffset=24913296, maxBytes=1048576), mytopic-13=(offset=27646404, logStartOffset=24913298, maxBytes=1048576), mytopic-19=(offset=27648276, logStartOffset=24913300, maxBytes=1048576), mytopic-1=(offset=27647036, logStartOffset=24913307, maxBytes=1048576)}, isolationLevel=READ_UNCOMMITTED, toForget=, metadata=(sessionId=1661483807, epoch=374)) (kafka.server.ReplicaFetcherThread)
java.io.IOException: Connection to 3 was disconnected before the response was read
...

My configs:

HDFS Connector config:

name=hdfs-sink
connector.class=io.confluent.connect.hdfs.HdfsSinkConnector
format.class=io.confluent.connect.hdfs.parquet.ParquetFormat
tasks.max=1
topics=mytopic
hdfs.url=hdfs://hdfsnode:8020/user/someuser/kafka_hdfs_sink/
flush.size=1000000

Kafka Connect Worker config:

bootstrap.servers=confleuntnode1:9092,confleuntnode2:9092,confleuntnode3:9092
key.converter=org.apache.kafka.connect.storage.StringConverter
value.converter=io.confluent.connect.avro.AvroConverter
value.converter.schema.registry.url=http://confleuntnode:8081
internal.key.converter=org.apache.kafka.connect.json.JsonConverter
internal.value.converter=org.apache.kafka.connect.json.JsonConverter
internal.key.converter.schemas.enable=false
internal.value.converter.schemas.enable=false
offset.storage.file.filename=/tmp/connect.offsets
plugin.path=/opt/confluent/current/share/java/

My question:

How to stream a bigger size of messages from Kafka to HDFS using Kafka Connect Worker?

1
You could use time based partitioner instead to get hourly partitions, for example, rather than just capping the files at only number of events. I've managed to get several GB files in S3 that way - OneCricketeer
@cricket_007, I know, but my question is about error. How to change my config to make it work with flush.size=1000000? - deeplay
It should just work... You might want to look at increasing heap sizes, though. Personally, we end up keeping the small files in Avro, then periodically batch overwrite&load to other Parquet tables - OneCricketeer

1 Answers

1
votes

I solved this issue by running connect in distributed mode (instead of standalone). Now I'm able to write to HDFS up to 3.5 million records (~256 mb). But there is a new issues with that: 1) very slow processing speed (35 mln records in 1 hour); 2) unable to write parquet files bigger than 256 Mb. I'll post a new SO questions.