0
votes

I have created a NiFi flow that eventually publishes json records as records with Avro encoded values and string keys, using a schema in Confluent Registry for the value schema. Here is the configuration for the AvroRecordSetWriter in NiFi.

I am now trying to use Kafka Connect (connect-standalone) to move the messages to a PostgreSQL database using JdbcSinkConnector, but am getting the following error: Error retrieving Avro schema for id 1

I have confirmed that I have a schema in my Confluent Registry with and ID of 1. Following are my configs for the Connect task

Worker Config:

bootstrap.servers=localhost:29092
key.converter.schemas.enable=false
key.converter=org.apache.kafka.connect.storage.StringConverter
value.converter=io.confluent.connect.avro.AvroConverter
value.converter.schema.registry.url=http://localhost:8081
internal.key.converter=org.apache.kafka.connect.json.JsonConverter
internal.value.converter=org.apache.kafka.connect.json.JsonConverter
offset.storage.file.filename=/tmp/connect.offsets
rest.host.name=localhost
rest.port=8083
plugin.path=share/java

Connector Config:

name=pg-sink
connector.class=io.confluent.connect.jdbc.JdbcSinkConnector
tasks.max=1
topics=rds
connection.url=jdbc:postgresql://localhost:5432/test
connection.user=postgres
connection.password=xxxxxxxx
insert.mode=upsert
table.name.format=test_data
auto.create=true

I created a flow in NiFi that consumes the messages properly and I have also successfully consumed messages (which are formated as JSON in the output) with kafka-avro-console-consumer by specifying --property schema.registry.url=http://schema-registry:8081. Note that I'm running the consumer within a Docker container, and that is why the url is not localhost.

I'm not sure what I am missing. My only thought is that I am using the wrong class for the key converter, but that would not make sense with the given error. Can anyone see what I am doing wrong?

1
Nifi can write to Postgres as well. Are you sure you need Kafka Connect and instead do all processing in one location? The error message isn't specific to the value or key, only that the ID cannot be found in the registry for one of the possible rds subjects... If you do GET /subjects on the registry, you might see what's available. Also, if you're using Connect in Docker it's best to use Distributed mode because offsets shouldn't be stored in an ephemeral environment (a container file system) - OneCricketeer
Thanks cricket_007. I had looked at a processor in NiFi (I don’t recall which) for going to PostgreSQL. I’ll probably go back to that line of attack. It looks like PutDatabaseRecord processor might get me there? Using the REST api for the registry, I did see the schema in the registry with ID=1, so I know it’s in the registry. Thanks for the tip on using distribute mode, I’ll switch over within Docker. - philnug
No just ID=1, but as the answer below points out, it needs to be under rds-value for the Confluent Avro deserialization and converters to work - OneCricketeer

1 Answers

0
votes

I don't know much about Nifi but I see that the name of the schema is "rds" and in the error logs it's say that it didn't found the subject in the schema registry.

Kafka use KafkaAvroSerializer to serialize avro records and in the same time registering the associated avro schema in the schema registry. It use KafkaAvroDeserializer to deserialize avro records and retrieving the associated schema from the schema registry.

Schema registry store schema into categories called "subjects" and the default behavior to naming the subject for a record is : topic_name-value for the value record and topic_name-key for the key.

In your case, you didn't register the schema with Kafka but with Nifi, so my guess will be that the name "rds" appears in or is the subject name on the schema registry.

How did you verify that you schema was corectly stored ?

Normally in your case the correct subject will be rds-value because you're using schema registry only on value records.