I've been struggling with this for about a week now trying to get a simple (3 fields) AVRO formated KSQL table as a source to a JDBC connector sink (mysql)
I am getting the following errors (after INFO line):
[2018-12-11 18:58:50,678] INFO Setting metadata for table "DSB_ERROR_TABLE_WINDOWED" to Table{name='"DSB_ERROR_TABLE_WINDOWED"', columns=[Column{'MOD_CLASS', isPrimaryKey=false, allowsNull=true, sqlType=VARCHAR}, Column{'METHOD', isPrimaryKey=false, allowsNull=true, sqlType=VARCHAR}, Column{'COUNT', isPrimaryKey=false, allowsNull=true, sqlType=BIGINT}]} (io.confluent.connect.jdbc.util.TableDefinitions)
[2018-12-11 18:58:50,679] ERROR WorkerSinkTask{id=dev-dsb-errors-mysql-sink-0} Task threw an uncaught and unrecoverable exception. Task is being killed and will not recover until manually restarted. (org.apache.kafka.connect.runtime.WorkerSinkTask)
org.apache.kafka.connect.errors.ConnectException: No fields found using key and value schemas for table: DSB_ERROR_TABLE_WINDOWED
at io.confluent.connect.jdbc.sink.metadata.FieldsMetadata.extract(FieldsMetadata.java:127)
at io.confluent.connect.jdbc.sink.metadata.FieldsMetadata.extract(FieldsMetadata.java:64)
at io.confluent.connect.jdbc.sink.BufferedRecords.add(BufferedRecords.java:79)
at io.confluent.connect.jdbc.sink.BufferedRecords.add(BufferedRecords.java:124)
at io.confluent.connect.jdbc.sink.JdbcDbWriter.write(JdbcDbWriter.java:63)
at io.confluent.connect.jdbc.sink.JdbcSinkTask.put(JdbcSinkTask.java:75)
at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:564)
at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:322)
at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:225)
at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:193)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:175)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:219)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
I can tell that the sink is doing something properly as the schema is pulled (see just before the error above) and the table is created successfully in the database with the proper schema:
MariaDB [dsb_errors_ksql]> describe DSB_ERROR_TABLE_WINDOWED;
+-----------+--------------+------+-----+---------+-------+
| Field | Type | Null | Key | Default | Extra |
+-----------+--------------+------+-----+---------+-------+
| MOD_CLASS | varchar(256) | YES | | NULL | |
| METHOD | varchar(256) | YES | | NULL | |
| COUNT | bigint(20) | YES | | NULL | |
+-----------+--------------+------+-----+---------+-------+
3 rows in set (0.01 sec)
And here is the KTABLE definition:
ksql> describe extended DSB_ERROR_TABLE_windowed;
Name : DSB_ERROR_TABLE_WINDOWED
Type : TABLE
Key field : KSQL_INTERNAL_COL_0|+|KSQL_INTERNAL_COL_1
Key format : STRING
Timestamp field : Not set - using <ROWTIME>
Value format : AVRO
Kafka topic : DSB_ERROR_TABLE_WINDOWED (partitions: 4, replication: 1)
Field | Type
---------------------------------------
ROWTIME | BIGINT (system)
ROWKEY | VARCHAR(STRING) (system)
MOD_CLASS | VARCHAR(STRING)
METHOD | VARCHAR(STRING)
COUNT | BIGINT
---------------------------------------
Queries that write into this TABLE
-----------------------------------
CTAS_DSB_ERROR_TABLE_WINDOWED_37 : create table DSB_ERROR_TABLE_windowed with (value_format='avro') as select mod_class, method, count(*) as count from DSB_ERROR_STREAM window session ( 60 seconds) group by mod_class, method having count(*) > 0;
There is an entry auto generated in the schema registry for this table (but no key entry):
{
"subject": "DSB_ERROR_TABLE_WINDOWED-value",
"version": 7,
"id": 143,
"schema": "{\"type\":\"record\",\"name\":\"KsqlDataSourceSchema\",\"namespace\":\"io.confluent.ksql.avro_schemas\",\"fields\":[{\"name\":\"MOD_CLASS\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"METHOD\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"COUNT\",\"type\":[\"null\",\"long\"],\"default\":null}]}"
}
and here is the Connect Worker definition:
{ "name": "dev-dsb-errors-mysql-sink",
"config": {
"connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
"tasks.max": "1",
"topics": "DSB_ERROR_TABLE_WINDOWED",
"connection.url": "jdbc:mysql://os-compute-d01.maeagle.corp:32692/dsb_errors_ksql?user=xxxxxx&password=xxxxxx",
"auto.create": "true",
"value.converter": "io.confluent.connect.avro.AvroConverter",
"value.converter.schema.registry.url": "http://kafka-d01.maeagle.corp:8081",
"key.converter": "org.apache.kafka.connect.storage.StringConverter"
}
}
My understanding (which could be wrong) is that KSQL should be creating the appropriate AVRO schemas in the Schema Registry and Kafka Connect should be able to read those back properly. As I noted above, something is working as the appropriate table is being generated in Mysql, although I am surprised that there is not a key field created...
Most of the posts and examples are using JSON as opposed to AVRO so they haven't been particularly useful.
It seems to be at the deserialization portion of reading and inserting of the topic record...
I am at a loss at this point and could use some guidance.
I have also opened a similiar ticket via github:
https://github.com/confluentinc/ksql/issues/2250
Regards,
--John
Key format : STRING
, and KSQL doesn't support Avro keys (yet) anyway. Therefore, no subject is in the Registry for it (since it isn't Avro)... I'm not sure about the error message, but the problem seems to be the JDBC sink, not KSQL – OneCricketeer