0
votes

I have the stream as below and i want to create another stream from this. I am trying the command as below and i am getting the following error. Am i missing something?

ksql> create stream down_devices_stream as select * from fakedata119 where deviceProperties['status']='false';
Failed to generate code for SqlPredicate.filterExpression: (FAKEDATA119.DEVICEPROPERTIES['status'] = 'false')schema:org.apache.kafka.connect.data.SchemaBuilder@6e18dbbfisWindowedKey:false
Caused by: Line 1, Column 180: Operator "<=" not allowed on reference operands

ksql> select * from fakedata119;
1529505497087 | null | 19 | visibility sensors | Wed Jun 20 16:38:17 CEST 2018 | {visibility=74, status=true}
1529505498087 | null | 7 | fans | Wed Jun 20 16:38:18 CEST 2018 | {temperature=44, rotationSense=1, status=false, frequency=49}
1529505499088 | null | 28 | air quality monitors | Wed Jun 20 16:38:19 CEST 2018 | {coPpm=257, status=false, Co2Ppm=134}
1529505500089 | null | 4 | fans | Wed Jun 20 16:38:20 CEST 2018 | {temperature=42, rotationSense=1, status=true, frequency=51}
1529505501089 | null | 23 | air quality monitors | Wed Jun 20 16:38:21 CEST 2018 | {coPpm=158, status=true, Co2Ppm=215}

sql> describe fakedata119;

     Field            | Type                                 
    ---------------------------------------------------------
     ROWTIME          | BIGINT           (system)            
     ROWKEY           | VARCHAR(STRING)  (system)            
     DEVICEID         | INTEGER                              
     CATEGORY         | VARCHAR(STRING)                      
     TIMESTAMP        | VARCHAR(STRING)                      
     DEVICEPROPERTIES | MAP[VARCHAR(STRING),VARCHAR(STRING)] 
2
What version of KSQL are you using? - Andrew Coates
I am using Version: 4.1.1 - jenjen
We'll try and help out here, but I can see you've had several questions with KSQL and maybe interactive chat will also help - there's a Slack community here with a dedicated #ksql channel: slackpass.io/confluentcommunity - Robin Moffatt

2 Answers

0
votes

Without seeing your input data, I have guessed that it looks something like this:

{
  "id": "a42",
  "category": "foo",
  "timestamp": "2018-06-21 10:04:57 BST",
  "deviceID": 42,
  "deviceProperties": {
    "status": "false",
    "foo": "bar"
  }
}

And if so, you are better using EXTRACTJSONFIELD to access the nested values, and build predicates.

CREATE STREAM test (Id VARCHAR, category VARCHAR, timeStamp VARCHAR, \
                    deviceID INTEGER, deviceProperties VARCHAR) \
WITH (KAFKA_TOPIC='test_map2', VALUE_FORMAT='JSON');


ksql> SELECT EXTRACTJSONFIELD(DEVICEPROPERTIES,'$.status') AS STATUS FROM fakeData223;
false

ksql> SELECT *  FROM fakeData223 \
      WHERE EXTRACTJSONFIELD(DEVICEPROPERTIES,'$.status')='false';
1529572405759 | null | a42 | foo | 2018-06-21 10:04:57 BST | 42 | {"status":"false","foo":"bar"}

The error you've found I've logged as a bug to track here: https://github.com/confluentinc/ksql/issues/1474

0
votes

I've added a test to cover this usecase:

https://github.com/confluentinc/ksql/pull/1476/files

Interestingly, this passes on our master and upcoming 5.0 branches, but fails on 4.1.

So... looks like this is an issue on the version you're using, but the good news is its fixed on the up coming release. Plus you can use Robin's work around above for now.

Happy querying!

Andy