0
votes

I send data and value schema to kafka topic like this:

./bin/kafka-avro-console-producer \
  --broker-list 10.0.0.0:9092 --topic orders \
  --property parse.key="true" \
  --property key.schema='{"type":"record","name":"key_schema","fields":[{"name":"id","type":"int"}]}' \
  --property key.separator="$" \
  --property value.schema='{"type":"record","name":"myrecord","fields":[{"name":"id","type":["null","int"],"default": null},{"name":"product","type": ["null","string"],"default": null}, {"name":"quantity", "type":  ["null","int"],"default": null}, {"name":"price","type":  ["null","int"],"default": null}]}' \
  --property schema.registry.url=http://10.0.0.0:8081

then I get this data from kafka for this sink properties:

{
  "name": "jdbc-oracle",
  "config": {
    "connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
    "tasks.max": "1",
    "topics": "orders",
    "connection.url": "jdbc:oracle:thin:@10.1.2.3:1071/orac",
    "connection.user": "[redact]",
    "connection.password": "[redact]",
    "auto.create": "true",
    "delete.enabled": "true",
    "pk.mode": "record_key",
    "pk.fields": "id",
    "insert.mode": "upsert",
    "name": "jdbc-oracle"
  },
  "tasks": [
    {
      "connector": "jdbc-oracle",
      "task": 0
    }
  ],
  "type": "sink"
}

But I want to get just json from kafka without value.schema . if I put kafka topic just this json data

{"id":9}${"id": {"int":9}, "product": {"string":"Yağız Gülbahar"}, "quantity": {"int":1071}, "price": {"int":61}}

How can I get this data from kafka and put oracle with confluent jdbc sink.

I want to make schema on Kafka Connect side?

And another thing is can I get two different types of data from one kafka topic and it goes two different table on oracle side with jdbc sink.

1
You mean, how to produce Avro messages without providing the schema for the payload?Giorgos Myrianthous
plugin.path doesn't belong in the connector configOneCricketeer

1 Answers

1
votes

If you have a source topic with JSON data that has no schema declared, you must add that schema before you can use the JDBC Sink.

Options include:

  1. ksqlDB, as shown here: https://www.youtube.com/watch?v=b-3qN_tlYR4&t=981s
  2. Kafka Connect's Single Message Transform capabilities. No SMT ships with Apache Kafka that does this but there are prototypes out there that could do this.
  3. Other stream processing e.g. Kafka Streams

Edit:

I mean can I define two different jdbc sink to different oracle tables from one kafka topic

Yes, each topic can be consumed by multiple sinks. The table.name.format configuration option can be used to route the topic to different table names as required.