0
votes

I've been struggling with this for about a week now trying to get a simple (3 fields) AVRO formated KSQL table as a source to a JDBC connector sink (mysql)

I am getting the following errors (after INFO line):

[2018-12-11 18:58:50,678] INFO Setting metadata for table "DSB_ERROR_TABLE_WINDOWED" to Table{name='"DSB_ERROR_TABLE_WINDOWED"', columns=[Column{'MOD_CLASS', isPrimaryKey=false, allowsNull=true, sqlType=VARCHAR}, Column{'METHOD', isPrimaryKey=false, allowsNull=true, sqlType=VARCHAR}, Column{'COUNT', isPrimaryKey=false, allowsNull=true, sqlType=BIGINT}]} (io.confluent.connect.jdbc.util.TableDefinitions)

[2018-12-11 18:58:50,679] ERROR WorkerSinkTask{id=dev-dsb-errors-mysql-sink-0} Task threw an uncaught and unrecoverable exception. Task is being killed and will not recover until manually restarted. (org.apache.kafka.connect.runtime.WorkerSinkTask)
org.apache.kafka.connect.errors.ConnectException: No fields found using key and value schemas for table: DSB_ERROR_TABLE_WINDOWED
        at io.confluent.connect.jdbc.sink.metadata.FieldsMetadata.extract(FieldsMetadata.java:127)
        at io.confluent.connect.jdbc.sink.metadata.FieldsMetadata.extract(FieldsMetadata.java:64)
        at io.confluent.connect.jdbc.sink.BufferedRecords.add(BufferedRecords.java:79)
        at io.confluent.connect.jdbc.sink.BufferedRecords.add(BufferedRecords.java:124)
        at io.confluent.connect.jdbc.sink.JdbcDbWriter.write(JdbcDbWriter.java:63)
        at io.confluent.connect.jdbc.sink.JdbcSinkTask.put(JdbcSinkTask.java:75)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:564)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:322)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:225)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:193)
        at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:175)
        at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:219)
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
        at java.util.concurrent.FutureTask.run(FutureTask.java:266)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)

I can tell that the sink is doing something properly as the schema is pulled (see just before the error above) and the table is created successfully in the database with the proper schema:

MariaDB [dsb_errors_ksql]> describe  DSB_ERROR_TABLE_WINDOWED;
+-----------+--------------+------+-----+---------+-------+
| Field     | Type         | Null | Key | Default | Extra |
+-----------+--------------+------+-----+---------+-------+
| MOD_CLASS | varchar(256) | YES  |     | NULL    |       |
| METHOD    | varchar(256) | YES  |     | NULL    |       |
| COUNT     | bigint(20)   | YES  |     | NULL    |       |
+-----------+--------------+------+-----+---------+-------+
3 rows in set (0.01 sec)

And here is the KTABLE definition:

ksql> describe extended DSB_ERROR_TABLE_windowed;

Name                 : DSB_ERROR_TABLE_WINDOWED
Type                 : TABLE
Key field            : KSQL_INTERNAL_COL_0|+|KSQL_INTERNAL_COL_1
Key format           : STRING
Timestamp field      : Not set - using <ROWTIME>
Value format         : AVRO
Kafka topic          : DSB_ERROR_TABLE_WINDOWED (partitions: 4, replication: 1)

 Field     | Type
---------------------------------------
 ROWTIME   | BIGINT           (system)
 ROWKEY    | VARCHAR(STRING)  (system)
 MOD_CLASS | VARCHAR(STRING)
 METHOD    | VARCHAR(STRING)
 COUNT     | BIGINT
---------------------------------------

Queries that write into this TABLE
-----------------------------------
CTAS_DSB_ERROR_TABLE_WINDOWED_37 : create table DSB_ERROR_TABLE_windowed  with (value_format='avro') as  select mod_class, method, count(*) as count  from DSB_ERROR_STREAM window session ( 60 seconds) group by mod_class, method   having count(*) > 0;

There is an entry auto generated in the schema registry for this table (but no key entry):

{
    "subject": "DSB_ERROR_TABLE_WINDOWED-value",
    "version": 7,
    "id": 143,
    "schema": "{\"type\":\"record\",\"name\":\"KsqlDataSourceSchema\",\"namespace\":\"io.confluent.ksql.avro_schemas\",\"fields\":[{\"name\":\"MOD_CLASS\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"METHOD\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"COUNT\",\"type\":[\"null\",\"long\"],\"default\":null}]}"
}

and here is the Connect Worker definition:

{ "name": "dev-dsb-errors-mysql-sink",
   "config": { 
        "connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
        "tasks.max": "1",
        "topics": "DSB_ERROR_TABLE_WINDOWED", 
        "connection.url": "jdbc:mysql://os-compute-d01.maeagle.corp:32692/dsb_errors_ksql?user=xxxxxx&password=xxxxxx",
        "auto.create": "true",
        "value.converter": "io.confluent.connect.avro.AvroConverter",
        "value.converter.schema.registry.url": "http://kafka-d01.maeagle.corp:8081",
        "key.converter": "org.apache.kafka.connect.storage.StringConverter"
   }       
}

My understanding (which could be wrong) is that KSQL should be creating the appropriate AVRO schemas in the Schema Registry and Kafka Connect should be able to read those back properly. As I noted above, something is working as the appropriate table is being generated in Mysql, although I am surprised that there is not a key field created...

Most of the posts and examples are using JSON as opposed to AVRO so they haven't been particularly useful.

It seems to be at the deserialization portion of reading and inserting of the topic record...

I am at a loss at this point and could use some guidance.

I have also opened a similiar ticket via github:

https://github.com/confluentinc/ksql/issues/2250

Regards,

--John

1
Key format : STRING, and KSQL doesn't support Avro keys (yet) anyway. Therefore, no subject is in the Registry for it (since it isn't Avro)... I'm not sure about the error message, but the problem seems to be the JDBC sink, not KSQLOneCricketeer
The issue seems to be related to github.com/confluentinc/ksql/issues/2233. > A windowed key consists of a key and a window start time. TimeWindowedSerde serializes these into a byte buffer, where the first x bytes are the key as serialized by the key serde (in this case StringSerde), and the last 8 bytes are the window start time, as a long. That would explain why the error occurs. If I do the same table, but without the windowing, I can successfully use the connector to import into Mysql...John Fortin

1 Answers

0
votes

As John says above, the key in the topic's record is not a string, but a string post-fixed with a single Java serialized 64bit integer, representing the window start time.

Connect does not come with a SMT that can handle the windowed key format. However, it would be possible to write one to strip off the integer and just return the natural key. You could then include this on the class path and update your connect config.

If you require the window start time in the database, then you can update you ksqlDB query to include the window start time as a field in the value.