This question is based on the solution path in KSQL stream from topic with heterogeneous JSON structures - I get an error when I try to select
from a stream created from a topic with a value_format='json'
.
I can still print
the underlying topic, but doing select * from stream;
throws an error.
We're using Confluent 5.3.1 aka Kafka 2.3.1.
Here's how I (re)create the issue...
- Start with some simple json data
ksql> PRINT 'sms-reporting.status-updated';
Format:JSON
{"ROWTIME":1573688864403,"ROWKEY":"null","acceptTs":"1573603201000","eventType":"REPORTING","isFinal":"true","messageId":"8619Z-1113H-00007-01CF5","nodeId":"86","responseCode":"3041","submissionAttempt":"1","ts":"1573603201567","type":"statusUpdated"}
- Create the stream:
ksql> CREATE STREAM statusupdated ( \
acceptTs VARCHAR, \
eventType VARCHAR, \
isFinal VARCHAR, \
messageId VARCHAR, \
nodeId VARCHAR, \
responseCode VARCHAR, \
submissionAttempt VARCHAR, \
ts VARCHAR, \
type VARCHAR \
) with (kafka_topic='sms-reporting.status-updated',value_format='json');
Message
----------------
Stream created
----------------
ksql> DESCRIBE statusupdated;
Name : STATUSUPDATED
Field | Type
-----------------------------------------------
ROWTIME | BIGINT (system)
ROWKEY | VARCHAR(STRING) (system)
ACCEPTTS | VARCHAR(STRING)
EVENTTYPE | VARCHAR(STRING)
ISFINAL | VARCHAR(STRING)
MESSAGEID | VARCHAR(STRING)
NODEID | VARCHAR(STRING)
RESPONSECODE | VARCHAR(STRING)
SUBMISSIONATTEMPT | VARCHAR(STRING)
TS | VARCHAR(STRING)
TYPE | VARCHAR(STRING)
-----------------------------------------------
- Attempt (unsuccessfully) to review the contents of the stream:
ksql> SELECT * FROM statusupdated;
Query terminated
java.lang.RuntimeException: com.fasterxml.jackson.databind.exc.UnrecognizedPropertyException: Unrecognized field "terminal" (class io.confluent.ksql.rest.entity.StreamedRow), not marked as ignorable (3 known properties: "finalMessage", "row", "errorMessage"])
at [Source: (String)"{"row":{"columns":[1573692743435,null,"1573603201000","REPORTING","true","8619Z-1113H-00007-01CF5","86","3041","1","1573603201567","statusUpdated"]},"errorMessage":null,"finalMessage":null,"terminal":false}"; line: 1, column: 206] (through reference chain: io.confluent.ksql.rest.entity.StreamedRow["terminal"])
Caused by: com.fasterxml.jackson.databind.exc.UnrecognizedPropertyException: Unrecognized field "terminal" (class io.confluent.ksql.rest.entity.StreamedRow), not marked as ignorable (3 known properties: "finalMessage", "row", "errorMessage"])
at [Source: (String)"{"row":{"columns":[1573692743435,null,"1573603201000","REPORTING","true","8619Z-1113H-00007-01CF5","86","3041","1","1573603201567","statusUpdated"]},"errorMessage":null,"finalMessage":null,"terminal":false}"; line: 1, column: 206] (through reference chain: io.confluent.ksql.rest.entity.StreamedRow["terminal"])
Caused by: Unrecognized field "terminal" (class io.confluent.ksql.rest.entity.StreamedRow), not marked as ignorable (3 known properties: "finalMessage", "row", "errorMessage"])
at [Source: (String)"{"row":{"columns":[1573692743435,null,"1573603201000","REPORTING","true","8619Z-1113H-00007-01CF5","86","3041","1","1573603201567","statusUpdated"]},"errorMessage":null,"finalMessage":null,"terminal":false}"; line: 1, column: 206] (through reference chain: io.confluent.ksql.rest.entity.StreamedRow["terminal"])
If I now use this stream (on which I still cannot execute a select
statement without receiving an error) to create another stream (which will, of course, be backed by a new topic), I still can't select
on the new stream, but I can print
the new backing topic and the data appears as expected.
- Create a derived stream
ksql> CREATE STREAM statusupdated2 AS SELECT acceptts FROM statusupdated;
Message
----------------------------
Stream created and running
----------------------------
ksql> describe statusupdated2;
Name : STATUSUPDATED2
Field | Type
--------------------------------------
ROWTIME | BIGINT (system)
ROWKEY | VARCHAR(STRING) (system)
ACCEPTTS | VARCHAR(STRING)
--------------------------------------
- Attempt (unsuccessfully) to review the contents of the derived stream:
ksql> SELECT * FROM statusupdated2;
Query terminated
java.lang.RuntimeException: com.fasterxml.jackson.databind.exc.UnrecognizedPropertyException: Unrecognized field "terminal" (class io.confluent.ksql.rest.entity.StreamedRow), not marked as ignorable (3 known properties: "finalMessage", "row", "errorMessage"])
at [Source: (String)"{"row":{"columns":[1573692743435,null,"1573603201000"]},"errorMessage":null,"finalMessage":null,"terminal":false}"; line: 1, column: 113] (through reference chain: io.confluent.ksql.rest.entity.StreamedRow["terminal"])
Caused by: com.fasterxml.jackson.databind.exc.UnrecognizedPropertyException: Unrecognized field "terminal" (class io.confluent.ksql.rest.entity.StreamedRow), not marked as ignorable (3 known properties: "finalMessage", "row", "errorMessage"])
at [Source: (String)"{"row":{"columns":[1573692743435,null,"1573603201000"]},"errorMessage":null,"finalMessage":null,"terminal":false}"; line: 1, column: 113] (through reference chain: io.confluent.ksql.rest.entity.StreamedRow["terminal"])
Caused by: Unrecognized field "terminal" (class io.confluent.ksql.rest.entity.StreamedRow), not marked as ignorable (3 known properties: "finalMessage", "row", "errorMessage"])
at [Source: (String)"{"row":{"columns":[1573692743435,null,"1573603201000"]},"errorMessage":null,"finalMessage":null,"terminal":false}"; line: 1, column: 113] (through reference chain: io.confluent.ksql.rest.entity.StreamedRow["terminal"])
print
the backing topic of the new stream:
ksql> PRINT statusupdated2;
Format:JSON
{"ROWTIME":1573692743435,"ROWKEY":"null","ACCEPTTS":"1573603201000"}
terminal
field from the message that is being sent from the server. We also added@JsonIgnoreProperties
to make sure that any future CLIs reading the old data would just ignore it. When you startup KSQL, what does it say as the CLI/Server version on the splash screen? – Almog