0
votes

I have a kafka-topic and I would like to feed it with AVRO data (currently in JSON). I know the "proper" way to do it is to use schema-registry but for testing purposes I would like to make it work without it.

So I am sending AVRO data as Array[Byte] as opposed to regular Json objects:

    val writer = new SpecificDatumWriter[GenericData.Record]("mySchema.avsc")
    val out = new ByteArrayOutputStream
    val encoder = EncoderFactory.get.binaryEncoder(out, null)
    writer.write(myAvroData, encoder)
    encoder.flush
    out.close
    out.toByteArray

The schema is embarked within each data; how can I make it work with kafka-connect? The kafka-connect configuration currently exhibits the following properties (data is written to s3 as json.gz files), and I want to write Parquet files:


{
  "name": "someName",
  "connector.class": "io.confluent.connect.s3.S3SinkConnector",
  "tasks.max": "120",
  "topics": "user_sync",
  "s3.region": "someRegion",
  "s3.bucket.name": "someBucket",
  "s3.part.size": "5242880",
  "s3.compression.type": "gzip",
  "filename.offset.zero.pad.width": "20",
  "flush.size": "5000",
  "rotate.interval.ms": "600000",
  "value.converter": "org.apache.kafka.connect.json.JsonConverter",
  "value.converter.schemas.enable": "false",
  "storage.class": "io.confluent.connect.s3.storage.S3Storage",
  "format.class": "io.confluent.connect.s3.format.json.JsonFormat",
  "partitioner.class": "io.confluent.connect.storage.partitioner.TimeBasedPartitioner",
  "path.format": "YYYY/MM/dd/HH",
  "timezone" : "UTC",
  "locale": "en",
  "partition.duration.ms": "600000",
  "timestamp.extractor": "RecordField",
  "timestamp.field" : "ts",
  "schema.compatibility": "NONE"

I suppose I need to change "format.class": "io.confluent.connect.hdfs.parquet.ParquetFormat? But is it enough?

Thanks a lot!

1

1 Answers

1
votes

JsonConverter will be unable to consume Avro encoded data since the binary format contains a schema ID from the registry that's needed to be extracted before the converter can determine what the data looks like

You'll want to use the registryless-avro-converter, which will create a Structured object, and then should be able to converted to a Parquet record.