I am using Confluent managed Kafka cluster, Schema Registry service and trying to process Debezium messages in a Flink job. The job is configured to use Table & SQL Connectors and Confluent Avro Format. However the job is not able to connect to Schema Registry and raises 401 error.
Table Connector configurations
tEnv.executeSql("CREATE TABLE flink_test_1 (\n" +
" ORDER_ID STRING,\n" +
" ORDER_TYPE STRING,\n" +
" USER_ID STRING,\n" +
" ORDER_SUM BIGINT\n" +
") WITH (\n" +
" 'connector' = 'kafka',\n" +
" 'topic' = 'flink_test_1',\n" +
" 'scan.startup.mode' = 'earliest-offset',\n" +
" 'format' = 'avro-confluent',\n" +
" 'avro-confluent.schema-registry.url' = 'https://<SR_ENDPOINT>',\n" +
" 'avro-confluent.schema-registry.subject' = 'flink_test_1-value',\n" +
" 'properties.basic.auth.credentials.source' = 'USER_INFO',\n" +
" 'properties.basic.auth.user.info' = '<SR_API_KEY>:<SR_API_SECRET>',\n" +
" 'properties.bootstrap.servers' = '<CLOUD_BOOTSTRAP_SERVER_ENDPOINT>:9092',\n" +
" 'properties.security.protocol' = 'SASL_SSL',\n" +
" 'properties.ssl.endpoint.identification.algorithm' = 'https',\n" +
" 'properties.sasl.mechanism' = 'PLAIN',\n" +
" 'properties.sasl.jaas.config' = 'org.apache.kafka.common.security.plain.PlainLoginModule required username=\"<CLUSTER_API_KEY>\" password=\"<CLUSTER_API_SECRET>\";'\n" +
")");
Error Message
Caused by: java.io.IOException: Failed to deserialize Avro record.
at org.apache.flink.formats.avro.AvroRowDataDeserializationSchema.deserialize(AvroRowDataDeserializationSchema.java:101)
at org.apache.flink.formats.avro.AvroRowDataDeserializationSchema.deserialize(AvroRowDataDeserializationSchema.java:44)
at org.apache.flink.api.common.serialization.DeserializationSchema.deserialize(DeserializationSchema.java:82)
at org.apache.flink.streaming.connectors.kafka.table.DynamicKafkaDeserializationSchema.deserialize(DynamicKafkaDeserializationSchema.java:113)
at org.apache.flink.streaming.connectors.kafka.internals.KafkaFetcher.partitionConsumerRecordsHandler(KafkaFetcher.java:179)
at org.apache.flink.streaming.connectors.kafka.internals.KafkaFetcher.runFetchLoop(KafkaFetcher.java:142)
at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:826)
at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:110)
at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:66)
at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:241)
Caused by: java.io.IOException: Could not find schema with id 100256 in registry
at org.apache.flink.formats.avro.registry.confluent.ConfluentSchemaRegistryCoder.readSchema(ConfluentSchemaRegistryCoder.java:77)
at org.apache.flink.formats.avro.RegistryAvroDeserializationSchema.deserialize(RegistryAvroDeserializationSchema.java:70)
at org.apache.flink.formats.avro.AvroRowDataDeserializationSchema.deserialize(AvroRowDataDeserializationSchema.java:98)
... 9 more
Caused by: io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException: Unauthorized; error code: 401
at io.confluent.kafka.schemaregistry.client.rest.RestService.sendHttpRequest(RestService.java:292)
at io.confluent.kafka.schemaregistry.client.rest.RestService.httpRequest(RestService.java:352)
at io.confluent.kafka.schemaregistry.client.rest.RestService.getId(RestService.java:660)
at io.confluent.kafka.schemaregistry.client.rest.RestService.getId(RestService.java:642)
at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.getSchemaByIdFromRegistry(CachedSchemaRegistryClient.java:217)
at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.getSchemaBySubjectAndId(CachedSchemaRegistryClient.java:291)
at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.getSchemaById(CachedSchemaRegistryClient.java:276)
at io.confluent.kafka.schemaregistry.client.SchemaRegistryClient.getById(SchemaRegistryClient.java:64)
at org.apache.flink.formats.avro.registry.confluent.ConfluentSchemaRegistryCoder.readSchema(ConfluentSchemaRegistryCoder.java:74)
... 11 more
I successfully tested the connection to Schema Registry by:
curl -u <SR_API_KEY>:<SR_API_SECRET> https://<SR_ENDPOINT>
It seem like the error message "io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException: Unauthorized; error code: 401" clearly says that <SR_API_KEY>:<SR_API_SECRET> were not passed to the Confluent Schema Registry.
I checked the documentation here https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/connectors/formats/avro-confluent.html, where only 3 Format options described: ["format", "avro-confluent.schema-registry.url", "avro-confluent.schema-registry.subject"] and no options for specifying SR_API_KEY and SR_API_SECRET.
I can't figure out how to successfully connect to the secure schema registry from the Flink program. Is this connection type supported by Flink? Does anyone know what the correct connection configuration should look like?
Thanks.
schema.registry.basic.auth.user.info
in the same format with colon – kopaka