3
votes

I am using Confluent managed Kafka cluster, Schema Registry service and trying to process Debezium messages in a Flink job. The job is configured to use Table & SQL Connectors and Confluent Avro Format. However the job is not able to connect to Schema Registry and raises 401 error.

Table Connector configurations

tEnv.executeSql("CREATE TABLE flink_test_1 (\n" +
        "    ORDER_ID STRING,\n" +
        "    ORDER_TYPE STRING,\n" +
        "    USER_ID STRING,\n" +
        "    ORDER_SUM BIGINT\n" +
        ") WITH (\n" +
        "    'connector' = 'kafka',\n" +
        "    'topic'     = 'flink_test_1',\n" +
        "    'scan.startup.mode' = 'earliest-offset',\n" +
        "    'format' = 'avro-confluent',\n" + 
        "    'avro-confluent.schema-registry.url' = 'https://<SR_ENDPOINT>',\n" +
        "    'avro-confluent.schema-registry.subject' = 'flink_test_1-value',\n" +
        "    'properties.basic.auth.credentials.source' = 'USER_INFO',\n" +
        "    'properties.basic.auth.user.info' = '<SR_API_KEY>:<SR_API_SECRET>',\n" +
        "    'properties.bootstrap.servers' = '<CLOUD_BOOTSTRAP_SERVER_ENDPOINT>:9092',\n" +
        "    'properties.security.protocol' = 'SASL_SSL',\n" +
        "    'properties.ssl.endpoint.identification.algorithm' = 'https',\n" +
        "    'properties.sasl.mechanism' = 'PLAIN',\n" +
        "    'properties.sasl.jaas.config' = 'org.apache.kafka.common.security.plain.PlainLoginModule   required username=\"<CLUSTER_API_KEY>\"   password=\"<CLUSTER_API_SECRET>\";'\n" +
        ")"); 

Error Message

Caused by: java.io.IOException: Failed to deserialize Avro record.
    at org.apache.flink.formats.avro.AvroRowDataDeserializationSchema.deserialize(AvroRowDataDeserializationSchema.java:101)
    at org.apache.flink.formats.avro.AvroRowDataDeserializationSchema.deserialize(AvroRowDataDeserializationSchema.java:44)
    at org.apache.flink.api.common.serialization.DeserializationSchema.deserialize(DeserializationSchema.java:82)
    at org.apache.flink.streaming.connectors.kafka.table.DynamicKafkaDeserializationSchema.deserialize(DynamicKafkaDeserializationSchema.java:113)
    at org.apache.flink.streaming.connectors.kafka.internals.KafkaFetcher.partitionConsumerRecordsHandler(KafkaFetcher.java:179)
    at org.apache.flink.streaming.connectors.kafka.internals.KafkaFetcher.runFetchLoop(KafkaFetcher.java:142)
    at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:826)
    at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:110)
    at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:66)
    at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:241)
Caused by: java.io.IOException: Could not find schema with id 100256 in registry
    at org.apache.flink.formats.avro.registry.confluent.ConfluentSchemaRegistryCoder.readSchema(ConfluentSchemaRegistryCoder.java:77)
    at org.apache.flink.formats.avro.RegistryAvroDeserializationSchema.deserialize(RegistryAvroDeserializationSchema.java:70)
    at org.apache.flink.formats.avro.AvroRowDataDeserializationSchema.deserialize(AvroRowDataDeserializationSchema.java:98)
    ... 9 more
Caused by: io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException: Unauthorized; error code: 401
    at io.confluent.kafka.schemaregistry.client.rest.RestService.sendHttpRequest(RestService.java:292)
    at io.confluent.kafka.schemaregistry.client.rest.RestService.httpRequest(RestService.java:352)
    at io.confluent.kafka.schemaregistry.client.rest.RestService.getId(RestService.java:660)
    at io.confluent.kafka.schemaregistry.client.rest.RestService.getId(RestService.java:642)
    at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.getSchemaByIdFromRegistry(CachedSchemaRegistryClient.java:217)
    at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.getSchemaBySubjectAndId(CachedSchemaRegistryClient.java:291)
    at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.getSchemaById(CachedSchemaRegistryClient.java:276)
    at io.confluent.kafka.schemaregistry.client.SchemaRegistryClient.getById(SchemaRegistryClient.java:64)
    at org.apache.flink.formats.avro.registry.confluent.ConfluentSchemaRegistryCoder.readSchema(ConfluentSchemaRegistryCoder.java:74)
    ... 11 more

I successfully tested the connection to Schema Registry by:

curl -u <SR_API_KEY>:<SR_API_SECRET> https://<SR_ENDPOINT>

It seem like the error message "io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException: Unauthorized; error code: 401" clearly says that <SR_API_KEY>:<SR_API_SECRET> were not passed to the Confluent Schema Registry.

I checked the documentation here https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/connectors/formats/avro-confluent.html, where only 3 Format options described: ["format", "avro-confluent.schema-registry.url", "avro-confluent.schema-registry.subject"] and no options for specifying SR_API_KEY and SR_API_SECRET.

I can't figure out how to successfully connect to the secure schema registry from the Flink program. Is this connection type supported by Flink? Does anyone know what the correct connection configuration should look like?

Thanks.

1
You might get a more direct answer from opening an issue with Flink devs or searching the source code if other properties can be passedOneCricketeer
Not quite sure, if this might help (because this is valid for Kafka Connect, not for Flink Table API, but I feel it might be somehow consistent): put your credentials in this property schema.registry.basic.auth.user.info in the same format with colonkopaka
Same issue here. Have you figured it out in the meantime?Stefan

1 Answers

0
votes

I got the same issue. After some investigation, I found a Jira ticket about this issue. If you can't upgrade your flink version, you can first use DataStream API to consume data and then convert it to Table.