I have created a NiFi flow that eventually publishes json records as records with Avro encoded values and string keys, using a schema in Confluent Registry for the value schema. Here is the configuration for the AvroRecordSetWriter in NiFi.
I am now trying to use Kafka Connect (connect-standalone) to move the messages to a PostgreSQL database using JdbcSinkConnector, but am getting the following error: Error retrieving Avro schema for id 1
I have confirmed that I have a schema in my Confluent Registry with and ID of 1. Following are my configs for the Connect task
Worker Config:
bootstrap.servers=localhost:29092
key.converter.schemas.enable=false
key.converter=org.apache.kafka.connect.storage.StringConverter
value.converter=io.confluent.connect.avro.AvroConverter
value.converter.schema.registry.url=http://localhost:8081
internal.key.converter=org.apache.kafka.connect.json.JsonConverter
internal.value.converter=org.apache.kafka.connect.json.JsonConverter
offset.storage.file.filename=/tmp/connect.offsets
rest.host.name=localhost
rest.port=8083
plugin.path=share/java
Connector Config:
name=pg-sink
connector.class=io.confluent.connect.jdbc.JdbcSinkConnector
tasks.max=1
topics=rds
connection.url=jdbc:postgresql://localhost:5432/test
connection.user=postgres
connection.password=xxxxxxxx
insert.mode=upsert
table.name.format=test_data
auto.create=true
I created a flow in NiFi that consumes the messages properly and I have also successfully consumed messages (which are formated as JSON in the output) with kafka-avro-console-consumer by specifying --property schema.registry.url=http://schema-registry:8081. Note that I'm running the consumer within a Docker container, and that is why the url is not localhost.
I'm not sure what I am missing. My only thought is that I am using the wrong class for the key converter, but that would not make sense with the given error. Can anyone see what I am doing wrong?
rdssubjects... If you doGET /subjectson the registry, you might see what's available. Also, if you're using Connect in Docker it's best to use Distributed mode because offsets shouldn't be stored in an ephemeral environment (a container file system) - OneCricketeerrds-valuefor the Confluent Avro deserialization and converters to work - OneCricketeer