0
votes

I have a stream in ksql, called turnstile_stream. For a column value (station_id) in that stream, when I query all entries, I get below result

ksql> select * from turnstile_stream where station_id = 40820 emit changes;
+----------------------------------------------------+----------------------------------------------------+----------------------------------------------------+----------------------------------------------------+----------------------------------------------------+
|ROWTIME                                             |ROWKEY                                              |STATION_ID                                          |STATION_NAME                                        |LINE                                                |
+----------------------------------------------------+----------------------------------------------------+----------------------------------------------------+----------------------------------------------------+----------------------------------------------------+
|1580720442456                                       |�Ը�
                                                                                                    |40820                                               |Rosemont                                            |blue                                                |
|1580720442456                                       |�Ը�
                                                                                                    |40820                                               |Rosemont                                            |blue                                                |

Means, there are only two entries in the stream for that station_id. Which is correct, since I had pushed only two events in my topic, which is being used to create the stream. Now, I have a table, which I have created by using below query. The query groups by station_id and takes a count of event in the stream turnstile_stream.

ksql> describe extended turnstile_summary;

Name                 : TURNSTILE_SUMMARY
Type                 : TABLE
Key field            : STATION_ID
Key format           : STRING
Timestamp field      : Not set - using <ROWTIME>
Value format         : AVRO
Kafka topic          : turnstile_summary_1 (partitions: 2, replication: 1)

 Field      | Type
----------------------------------------
 ROWTIME    | BIGINT           (system)
 ROWKEY     | VARCHAR(STRING)  (system)
 STATION_ID | INTEGER
 COUNT      | BIGINT
----------------------------------------

Queries that write from this TABLE
-----------------------------------
CTAS_TURNSTILE_SUMMARY_6 : CREATE TABLE TURNSTILE_SUMMARY WITH (KAFKA_TOPIC='turnstile_summary_1', PARTITIONS=2, REPLICAS=1, VALUE_FORMAT='AVRO') AS SELECT
  TURNSTILE_STREAM.STATION_ID "STATION_ID",
  COUNT(*) "COUNT"
FROM TURNSTILE_STREAM TURNSTILE_STREAM
GROUP BY TURNSTILE_STREAM.STATION_ID
EMIT CHANGES;

Now, the problem is, when I query this turnstile_summary table, I get the below result, which doesn't makes sense.

ksql> select * from turnstile_summary where station_id = 40820 emit changes;
+------------------------------------------------------------------+------------------------------------------------------------------+------------------------------------------------------------------+------------------------------------------------------------------+
|ROWTIME                                                           |ROWKEY                                                            |STATION_ID                                                        |COUNT                                                             |
+------------------------------------------------------------------+------------------------------------------------------------------+------------------------------------------------------------------+------------------------------------------------------------------+
|1580720442562                                                     |�Ը�
                                                                                                                                |40820                                                             |9                                                                 |
|1580720442562                                                     |�Ը�
                                                                                                                                |40820                                                             |10                                                                |

As you can see, the count is 9 and 10, which can not be, since there are only two rows in the stream for that station_id. I am scratching my head, but to no use. Any help is highly appreciated.

1

1 Answers

0
votes

I made two changes to make this work.

First, the weird characters in the ROWKEY column of stream and table were due to long type in the key Avro Schema. I changed the key schema from

{
  "type": "record",
  "name": "arrival.key",
  "fields": [
    {
      "name": "timestamp",
      "type": "long"
    }
  ]
}

to

{
  "namespace": "com.udacity",
  "type": "record",
  "name": "arrival.key",
  "fields": [
    {
      "name": "timestamp",
      "type": "string"     <<-----------
    }
  ]
}

Second, when I was declaring the stream, I was giving it a key declaration, which I should not have given. So, I changed my stream definition from

CREATE STREAM turnstile_stream (
    station_id INT,
    station_name VARCHAR,
    line VARCHAR
) WITH (
    KAFKA_TOPIC='app.entity.turnstile',
    VALUE_FORMAT='AVRO',
    KEY='station_id'
);

to

CREATE STREAM turnstile_stream (
    station_id INT,
    station_name VARCHAR,
    line VARCHAR
) WITH (
    KAFKA_TOPIC='app.entity.turnstile',
    VALUE_FORMAT='AVRO'
);

After making these changes, my aggregate was working properly.