1
votes

I'm trying to sink data directly from KSQL into InfluxDB (or any other connector that would require definitions). I'm able to get things working in the simple case, but I start having trouble when the schema requires complex types. (I.e., tags for InfuxDB).

Here's an example of my stream/schema:

 Field    | Type                                                   
-------------------------------------------------------------------
 ROWKEY   | VARCHAR(STRING)  (primary key)
 FIELD_1  | VARCHAR(STRING)                                        
 FIELD_2  | VARCHAR(STRING)                                        
 FIELD_3  | VARCHAR(STRING)                                        
 FIELD_4  | DOUBLE                                                 
 TAGS     | MAP<STRING, VARCHAR(STRING)> 

If I manually create an AVRO schema and populate the records from a simple producer, I can get through the getting started guide here and embed the tags for InfluxDB.

However, when I move to KSQL, if I try to sink the AVRO stream directly into InfluxDB, I lose information on the complex types (tags). I notice the warning from this blog post, "Warning ksqlDB/KSQL cannot yet write data in an Avro format that is compatible with this connector"

Next, I try converting the AVRO stream into JSON format, but now I understand that I would have to specify the schema in each record, similar to what this question is posing. I haven't been able to convert an AVRO stream into a JSON stream and embed the schema and payload at the same time.

Finally, I see the "jiggling solution" with kafkacat, but this would force me to dump records out from KSQL into kafkacat, and then back into Kafka before finally arriving at Influx.

Is there a method to sink complex records directly from KSQL in either JSON or AVRO format into a connector?

1

1 Answers

1
votes

I would imagine the reason ksqlDB can't yet output the AVRO data in the format InfluxDB requires is because it won't output the TAGS field as an Avro map type due to Avro maps requiring a non-null key and the SQL MAP<STRING, STRING> type allowing null keys. Hence ksqlDB serializes the map as an Avro array of key-value entries.

To get something working with Avro you'll need either:

  1. Support for non-null types: https://github.com/confluentinc/ksql/issues/4436, or
  2. Support for using existing Avro schema: https://github.com/confluentinc/ksql/issues/3634

Please feel free to up-vote / comment on these issues to raise their profiles.

Previously, a JSON based solution would not of worked because, as you've pointed out, the connector requires the JSON schema embedded in the payload. However, the most recent version of Confluent Platform / Schema Registry supports JSON schemas in the Schema Registry. Hence, while I haven't tried it, upgrading to the latest CP version may mean a JSON based solution will work. If not, it is probably work raise a Jira/Github ticket to get the appropriate component upgraded for this to work.