1
votes

I'm trying to create a Kafka-connect connector to sink from an AVRO Topic to a file.

And then restore this file to another topic using kafka-connect.

The sink is working fine, I could see the sink file growing and read the data. But when I try to restore to a new topic, the new topic stays with no data..

And I get no errors, I already reset the offset, I create a new kafka-connect and tried to restore, I create a full new Kafka cluster and always the same, no error on the source connector, but the topic is empty.

Here the output of the source connector config:

{
  "name": "restored-exchange-rate-log",
  "config": {
    "connector.class": "org.apache.kafka.connect.file.FileStreamSourceConnector",
    "value.converter.schema.registry.url": "http://kafka-schema:8881",
    "file": "/tmp/exchange-rate-log.sink.txt",
    "format.include.keys": "true",
    "source.auto.offset.reset": "earliest",
    "tasks.max": "1",
    "value.converter.schemas.enable": "true",
    "name": "restored-exchange-rate-log",
    "topic": "restored-exchange-rate-log",
    "value.converter": "io.confluent.connect.avro.AvroConverter",
    "key.converter": "org.apache.kafka.connect.storage.StringConverter"
  },
  "tasks": [
    {
      "connector": "restored-exchange-rate-log",
      "task": 0
    }
  ],
  "type": "source"
}

And here the output of the source connecotor status:

{
  "name": "restored-exchange-rate-log",
  "connector": {
    "state": "RUNNING",
    "worker_id": "kafka-connect:8883"
  },
  "tasks": [
    {
      "state": "RUNNING",
      "id": 0,
      "worker_id": "kafka-connect:8883"
    }
  ],
  "type": "source"
}

Here the output of the sink connector config:

{
    "name": "bkp-exchange-rate-log",
    "config": {
    "connector.class": "org.apache.kafka.connect.file.FileStreamSinkConnector",
    "source.auto.offset.reset": "earliest",
    "tasks.max": "1",
    "topics": "exchange-rate-log",
    "value.converter.value.subject.name.strategy": "io.confluent.kafka.serializers.subject.RecordNameStrategy",
    "value.converter.schema.registry.url": "http://kafka-schema:8881",
    "file": "/tmp/exchange-rate-log.sink.txt",
    "format.include.keys": "true",
    "value.converter.schemas.enable": "true",
    "name": "bkp-exchange-rate-log",
    "value.converter": "io.confluent.connect.avro.AvroConverter",
    "key.converter": "org.apache.kafka.connect.storage.StringConverter"
    },
    "tasks": [
    {
        "connector": "bkp-exchange-rate-log",
        "task": 0
    }
    ],
    "type": "sink"
}

Here the output of the sink connector status:

{
    "name": "bkp-exchange-rate-log",
    "connector": {
    "state": "RUNNING",
    "worker_id": "kafka-connect:8883"
    },
    "tasks": [
    {
        "state": "RUNNING",
        "id": 0,
        "worker_id": "kafka-connect:8883"
    }
    ],
    "type": "sink"
}

The sink file is working, always growing, but the topic restored-exchange-rate-log is totally empty.


Adding more details.

I have tried now to do the "Zalando" way, but we don't use the s3, we are using the FileStream connector.

Here the Sink:

{
  "connector.class": "org.apache.kafka.connect.file.FileStreamSinkConnector",
  "file": "/tmp/exchange-rate-log.bin",
  "format.include.keys": "true",
  "tasks.max": "1",
  "topics": "exchange-rate-log",
  "format": "binary",
  "value.converter": "com.spredfast.kafka.connect.s3.AlreadyBytesConverter",
  "key.converter": "com.spredfast.kafka.connect.s3.AlreadyBytesConverter",
  "name": "bkp-exchange-rate-log"
}

Here the Source:

{
  "connector.class": "org.apache.kafka.connect.file.FileStreamSourceConnector",
  "file": "/tmp/exchange-rate-log.bin",
  "format.include.keys": "true",
  "tasks.max": "1",
  "format": "binary",
  "topic": "bin-test-exchange-rate-log",
  "value.converter": "com.spredfast.kafka.connect.s3.AlreadyBytesConverter",
  "key.converter": "com.spredfast.kafka.connect.s3.AlreadyBytesConverter",
  "name": "restore-exchange-rate-log"
}

The sink connector looks fine, the sink generated this file /tmp/exchange-rate-log.bin and is increasing, but the Source (Restore) is getting an error:

Caused by: org.apache.kafka.connect.errors.DataException: bin-test-exchange-rate-log error: Not a byte array! [B@761db301
    at com.spredfast.kafka.connect.s3.AlreadyBytesConverter.fromConnectData(AlreadyBytesConverter.java:22)
    at org.apache.kafka.connect.runtime.WorkerSourceTask.lambda$convertTransformedRecord$2(WorkerSourceTask.java:269)
    at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:128)
    at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:162)
    ... 11 more
2
Out of interest, what are you building here? Using files is generally an anti-pattern, particularly if you already have one topic and want to write to another. docs.confluent.io/current/connect/kafka-connect-replicator/… might be worth a look - Robin Moffatt
You'll want to use bytearray converters to dump and restore. For example, here's an example of doing it with S3. jobs.zalando.com/tech/blog/backing-up-kafka-zookeeper/… - OneCricketeer
@RobinMoffatt I'm trying to build a backup from my current topics, because of this I'm using txt files to be able to versioning and recover for a certain period of time. - Leonardo Cuquejo
@cricket_007 I have done using the bytearray. The sink file has now this kind of content: [B@2e2f84bd [B@6f34b60d [B@254544f8 But when I try to restore.. same thing no data on the topic. Maybe the Zalando way doesn't work with avro? - Leonardo Cuquejo
I've used the Zalando one fine for recovering Avro... In fact, binary data doesn't care if there are Avro, integers, strings, etc data... Kafka just stores bytes, so if you want to backup and recover a topic (without worrying about serialization), then it must be binary, which is not intended to be human-readable - OneCricketeer

2 Answers

1
votes

I was able to generate the "Dump" of a topic using the kafka-avro-console-consumer. We are using SSL + Schema Registry.

Here is the command line to be able to generate the dump of a topic:

tpc=exchange-rate-log
SCHEMA_REGISTRY_OPTS="-Djavax.net.ssl.keyStore=. -Djavax.net.ssl.trustStore=. -Djavax.net.ssl.keyStorePassword=. -Djavax.net.ssl.trustStorePassword=." \
kafka-avro-console-consumer \
  --from-beginning --bootstrap-server $CONNECT_BOOTSTRAP_SERVERS \
  --property schema.registry.url=$CONNECT_SCHEMA_REGISTRY_URL \
  --topic $tpc --consumer-property security.protocol=SSL \
  --consumer-property ssl.truststore.location=/etc/ssl/kafkaproducer.truststore.jks \
  --consumer-property ssl.truststore.password=$MYPASS \
  --consumer-property ssl.keystore.location=/etc/ssl/kafkaproducer.keystore.jks \
  --consumer-property ssl.keystore.password=$MYPASS \
  --consumer-property ssl.key.password=$MYPASS \
  --property "key.separator=::-::" \
  --property "schema.id.separator=::_::" \
  --property print.schema.ids=true \
  --timeout-ms 15000 \
  --property "print.key=true" \
  --key-deserializer "org.apache.kafka.common.serialization.StringDeserializer" > $tpc.dump

But I didn't find a way how to import it back using the kafka-avro-console-producer, because it doesn't work with non-avro keys. With this dump file, I could write a python producer that reads that file and restore the topic back.

0
votes

I'm not entirely sure that Connect File connectors are a good use case for this.

Plus, the Avro converter won't dump the file in a reproducible format. It'll look like Struct{field=value}

If you really wanted to dump to a file, just do kafka-avro-console-consumer, include the key, pass the --key-deserializer as the string one, then write it out using > file.txt

To recover, you can try to use the Avro console producer, but there is no string serializer property, so the keys will need to be quoted, I believe to be passed to the JSON parser

You can test like so

echo '"1"|{"data":value"}'  > kafka-avro-console-producer...

(need to set the key.separator property, as well)

And doing a file would look like

kafka-avro-console-producer...  < file.txt 

For this to work in the case your whole Kafka Cluster goes away, and you are left with just this file, though, then you'll also need to take a backup of your Avro schema (because the Registry _schemas topic is gone)