2
votes

Lets say we have a setup as follows.

enter image description here Schema evolution compatibility is set to BACKWARD.

JDBC Source Connector polls data from DB writing to Kafka topic.HDFS Sink Connector read message from Kafka topic and write to HDFS in Avro format.

Following the the flow as I understood.

  1. JDBC Source Connector query DB and generate the Schema V1 from JDBC Metadata from ResultSet.V1 has col1,col2,col3.Schema V1 is registered in Schema Registry.
  2. Source connector polls data from DB and write messages to the Kafka topic in V1 schema.
  3. (Question 1) When HDFS Sink connector read messages from the topic ,does it validate the messages against the V1 schema from the Schema Registry?

Next DB schema is changed. Column "col3" is removed from the table.

  1. Next time JDBC Source polls DB it sees that the schema has changed, generate new Schema V2 (col1,col2) and register V2 is Schema Registry.
  2. Source Connect continue polling data and write to Kafka topic in V2 schema.
  3. Now the Kafka Topic can have messages in both V1 and V2 schema.
  4. (Question 2) When HDFS Sink connector read message does it now validate messages against Schema V2 ?

This this the case addressed in the Confluent documentation under the Backward Compatibility ? : [https://docs.confluent.io/current/schema-registry/avro.html#schema-evolution-and-compatibility]

An example of a backward compatible change is a removal of a field. A consumer that was developed to process events without this field will be able to process events written with the old schema and contain the field – the consumer will just ignore that field.

1

1 Answers

2
votes

The registry only validates when a new schema is registered.

Therefore, it's if/when the source connector detects a change, then validation occurs at the registry side

As for HDFS connector, there is a separate schema.compatibility property that applies a projection over records held in memory and any new records. When you get a record with a new schema, and have a backwards compatible update, then all messages not yet flushed will be updated to hold the new schema when an Avro container file is written.

Aside: just because the registry thinks it's backwards, doesn't guarantee the sink connector does... The validation within the source code is different, and we've had multiple issues with it :/