0
votes

I am trying to migrate a schema registry from a local cluster to Confluent Cloud using Replicator as it is specified in the tutorial. It manages to copy all subjects except a few and I cannot figure out why... It spits out the following error:

ERROR Failed to translate schema registry record (io.confluent.connect.replicator.schemas.SchemaTranslator:188)
io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException: Error while registering schema; error code: 500; error code: 500
    at io.confluent.kafka.schemaregistry.client.rest.RestService.sendHttpRequest(RestService.java:292)
    at io.confluent.kafka.schemaregistry.client.rest.RestService.httpRequest(RestService.java:352)
    at io.confluent.kafka.schemaregistry.client.rest.RestService.registerSchema(RestService.java:495)
    at io.confluent.kafka.schemaregistry.client.rest.RestService.registerSchema(RestService.java:486)
    at io.confluent.kafka.schemaregistry.client.rest.RestService.registerSchema(RestService.java:481)
    at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.registerAndGetId(CachedSchemaRegistryClient.java:212)
    at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.register(CachedSchemaRegistryClient.java:267)
    at io.confluent.connect.replicator.schemas.SchemaTranslator.translateSchema(SchemaTranslator.java:211)
    at io.confluent.connect.replicator.schemas.SchemaTranslator.translateSchemas(SchemaTranslator.java:166)
    at io.confluent.connect.replicator.schemas.SchemaTranslator.translateCollectedRecords(SchemaTranslator.java:135)
    at io.confluent.connect.replicator.ReplicatorSourceTask.translateCollectedRecords(ReplicatorSourceTask.java:566)
    at io.confluent.connect.replicator.ReplicatorSourceTask.poll(ReplicatorSourceTask.java:558)
    at org.apache.kafka.connect.runtime.WorkerSourceTask.poll(WorkerSourceTask.java:289)
    at org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:256)
    at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:185)
    at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:235)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)

The schemas that it does not manage to copy -- does not look any different from those that it manages to copy.

How can I debug it?

I am using Confluent platform 6.0.0.

connect-standalone.properties:


#value.converter=org.apache.kafka.connect.storage.StringConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
# Converter-specific settings can be passed in by prefixing the Converter's setting with the converter you want to apply
# it to
key.converter.schemas.enable=false
value.converter.schemas.enable=false

# The internal converter used for offsets and config data is configurable and must be specified, but most users will
# always want to use the built-in default. Offset and config data is never visible outside of Kafka Connect in this format.
internal.key.converter=org.apache.kafka.connect.json.JsonConverter
internal.value.converter=org.apache.kafka.connect.json.JsonConverter
internal.key.converter.schemas.enable=false
internal.value.converter.schemas.enable=false

# Store offsets on local filesystem
offset.storage.file.filename=/tmp/connect.offsets
# Flush much faster than normal, which is useful for testing/debugging
offset.flush.interval.ms=5000

ssl.endpoint.identification.algorithm=https
sasl.mechanism=PLAIN
request.timeout.ms=20000
retry.backoff.ms=500
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required \
username="****" password="****";
security.protocol=SASL_SSL

consumer.ssl.endpoint.identification.algorithm=https
consumer.sasl.mechanism=PLAIN
consumer.request.timeout.ms=20000
consumer.retry.backoff.ms=500
consumer.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required \
username="****" password="****";
consumer.security.protocol=SASL_SSL
consumer.offset.start=earliest

producer.ssl.endpoint.identification.algorithm=https
producer.sasl.mechanism=PLAIN
producer.request.timeout.ms=20000
# x4
producer.retry.backoff.ms=500
producer.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required \
username="****" password="****";
producer.security.protocol=SASL_SSL

config.storage.replication.factor=1
offset.storage.replication.factor=1
istatus.storage.replication.factor=1

quickstart-replicator.properties:

# basic connector configuration
name=replicator-v28
connector.class=io.confluent.connect.replicator.ReplicatorSourceConnector

key.converter=io.confluent.connect.replicator.util.ByteArrayConverter
value.converter=io.confluent.connect.replicator.util.ByteArrayConverter
header.converter=io.confluent.connect.replicator.util.ByteArrayConverter

tasks.max=4

# source cluster connection info
src.kafka.bootstrap.servers=localhost:9092

# destination cluster connection info
dest.kafka.ssl.endpoint.identification.algorithm=https
dest.kafka.sasl.mechanism=PLAIN
dest.kafka.request.timeout.ms=20000
dest.kafka.bootstrap.servers=<kafka-cloud-url>:9092
retry.backoff.ms=500
dest.kafka.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="****" password="****";
dest.kafka.security.protocol=SASL_SSL

# Schema Registry migration topics to replicate from source to destination
# topic.whitelist indicates which topics are of interest to replicator
topic.whitelist=_schemas
# schema.registry.topic indicates which of the topics in the ``whitelist`` contains schemas
schema.registry.topic=_schemas

# Connection settings for destination Confluent Cloud Schema Registry
schema.registry.url=https://<cloud-schema-registry-url>
schema.registry.client.basic.auth.credentials.source=USER_INFO
schema.registry.client.basic.auth.user.info=****:****
confluent.topic.replication.factor=3
1

1 Answers

0
votes

As the log states, the problem is that the client could not publish a subject in the schema registry.

The way to debug it is to set the debug logging level for Replicator. For instance, you can change in ~/confluent-6.0.0/etc/kafka/connect-log4j.properties the line:

log4j.rootLogger=DEBUG, stdout, connectAppender

You need to restart the replicator and write the logs in a file, like this:

./connect-standalone.sh ../etc/kafka/connect-standalone.properties ../etc/kafka-connect-replicator/quickstart-replicator.properties | tee ../logs/debug.txt

In the debug.txt, you can find which subject could not be POSTed in the schema registry and try to do it manually.

Most likely, it happens because the destination schema registry already contains a subject with the same id. Nore, that if you manually delete a subject from schema, it still persists in schema registry, just not being visible. To get rid completely of a subject from schema registry, you need to do a "hard delete" of the subject.