0
votes

This question is based on the solution path in KSQL stream from topic with heterogeneous JSON structures - I get an error when I try to select from a stream created from a topic with a value_format='json'. I can still print the underlying topic, but doing select * from stream; throws an error.

We're using Confluent 5.3.1 aka Kafka 2.3.1.

Here's how I (re)create the issue...

  1. Start with some simple json data
ksql> PRINT 'sms-reporting.status-updated';
Format:JSON
{"ROWTIME":1573688864403,"ROWKEY":"null","acceptTs":"1573603201000","eventType":"REPORTING","isFinal":"true","messageId":"8619Z-1113H-00007-01CF5","nodeId":"86","responseCode":"3041","submissionAttempt":"1","ts":"1573603201567","type":"statusUpdated"}
  1. Create the stream:
ksql> CREATE STREAM statusupdated ( \
 acceptTs VARCHAR, \
 eventType VARCHAR, \
 isFinal VARCHAR, \
 messageId VARCHAR, \
 nodeId VARCHAR, \
 responseCode VARCHAR, \
 submissionAttempt VARCHAR, \
 ts VARCHAR, \
 type VARCHAR \
) with (kafka_topic='sms-reporting.status-updated',value_format='json');

 Message
----------------
 Stream created
----------------
ksql> DESCRIBE statusupdated;

Name                 : STATUSUPDATED
 Field             | Type
-----------------------------------------------
 ROWTIME           | BIGINT           (system)
 ROWKEY            | VARCHAR(STRING)  (system)
 ACCEPTTS          | VARCHAR(STRING)
 EVENTTYPE         | VARCHAR(STRING)
 ISFINAL           | VARCHAR(STRING)
 MESSAGEID         | VARCHAR(STRING)
 NODEID            | VARCHAR(STRING)
 RESPONSECODE      | VARCHAR(STRING)
 SUBMISSIONATTEMPT | VARCHAR(STRING)
 TS                | VARCHAR(STRING)
 TYPE              | VARCHAR(STRING)
-----------------------------------------------
  1. Attempt (unsuccessfully) to review the contents of the stream:
ksql> SELECT * FROM statusupdated;
Query terminated
java.lang.RuntimeException: com.fasterxml.jackson.databind.exc.UnrecognizedPropertyException: Unrecognized field "terminal" (class io.confluent.ksql.rest.entity.StreamedRow), not marked as ignorable (3 known properties: "finalMessage", "row", "errorMessage"])
 at [Source: (String)"{"row":{"columns":[1573692743435,null,"1573603201000","REPORTING","true","8619Z-1113H-00007-01CF5","86","3041","1","1573603201567","statusUpdated"]},"errorMessage":null,"finalMessage":null,"terminal":false}"; line: 1, column: 206] (through reference chain: io.confluent.ksql.rest.entity.StreamedRow["terminal"])
Caused by: com.fasterxml.jackson.databind.exc.UnrecognizedPropertyException: Unrecognized field "terminal" (class io.confluent.ksql.rest.entity.StreamedRow), not marked as ignorable (3 known properties: "finalMessage", "row", "errorMessage"])
 at [Source: (String)"{"row":{"columns":[1573692743435,null,"1573603201000","REPORTING","true","8619Z-1113H-00007-01CF5","86","3041","1","1573603201567","statusUpdated"]},"errorMessage":null,"finalMessage":null,"terminal":false}"; line: 1, column: 206] (through reference chain: io.confluent.ksql.rest.entity.StreamedRow["terminal"])
Caused by: Unrecognized field "terminal" (class io.confluent.ksql.rest.entity.StreamedRow), not marked as ignorable (3 known properties: "finalMessage", "row", "errorMessage"])
 at [Source: (String)"{"row":{"columns":[1573692743435,null,"1573603201000","REPORTING","true","8619Z-1113H-00007-01CF5","86","3041","1","1573603201567","statusUpdated"]},"errorMessage":null,"finalMessage":null,"terminal":false}"; line: 1, column: 206] (through reference chain: io.confluent.ksql.rest.entity.StreamedRow["terminal"])

If I now use this stream (on which I still cannot execute a select statement without receiving an error) to create another stream (which will, of course, be backed by a new topic), I still can't select on the new stream, but I can print the new backing topic and the data appears as expected.

  1. Create a derived stream
ksql> CREATE STREAM statusupdated2 AS SELECT acceptts FROM statusupdated;

 Message
----------------------------
 Stream created and running
----------------------------
ksql> describe statusupdated2;

Name                 : STATUSUPDATED2
 Field    | Type
--------------------------------------
 ROWTIME  | BIGINT           (system)
 ROWKEY   | VARCHAR(STRING)  (system)
 ACCEPTTS | VARCHAR(STRING)
--------------------------------------
  1. Attempt (unsuccessfully) to review the contents of the derived stream:
ksql> SELECT * FROM statusupdated2;
Query terminated
java.lang.RuntimeException: com.fasterxml.jackson.databind.exc.UnrecognizedPropertyException: Unrecognized field "terminal" (class io.confluent.ksql.rest.entity.StreamedRow), not marked as ignorable (3 known properties: "finalMessage", "row", "errorMessage"])
 at [Source: (String)"{"row":{"columns":[1573692743435,null,"1573603201000"]},"errorMessage":null,"finalMessage":null,"terminal":false}"; line: 1, column: 113] (through reference chain: io.confluent.ksql.rest.entity.StreamedRow["terminal"])
Caused by: com.fasterxml.jackson.databind.exc.UnrecognizedPropertyException: Unrecognized field "terminal" (class io.confluent.ksql.rest.entity.StreamedRow), not marked as ignorable (3 known properties: "finalMessage", "row", "errorMessage"])
 at [Source: (String)"{"row":{"columns":[1573692743435,null,"1573603201000"]},"errorMessage":null,"finalMessage":null,"terminal":false}"; line: 1, column: 113] (through reference chain: io.confluent.ksql.rest.entity.StreamedRow["terminal"])
Caused by: Unrecognized field "terminal" (class io.confluent.ksql.rest.entity.StreamedRow), not marked as ignorable (3 known properties: "finalMessage", "row", "errorMessage"])
 at [Source: (String)"{"row":{"columns":[1573692743435,null,"1573603201000"]},"errorMessage":null,"finalMessage":null,"terminal":false}"; line: 1, column: 113] (through reference chain: io.confluent.ksql.rest.entity.StreamedRow["terminal"])
  1. print the backing topic of the new stream:
ksql> PRINT statusupdated2;
Format:JSON
{"ROWTIME":1573692743435,"ROWKEY":"null","ACCEPTTS":"1573603201000"}
1
This seems to me like you may be using a mismatching version of the CLI and your server. We removed the terminal field from the message that is being sent from the server. We also added @JsonIgnoreProperties to make sure that any future CLIs reading the old data would just ignore it. When you startup KSQL, what does it say as the CLI/Server version on the splash screen?Almog
I shall investigate.Nate Rose

1 Answers

0
votes

Did you try

SET 'auto.offset.reset'='earliest';

before running the select query ?