I am using KSQL to track the delay between stops for a fleet management system, for simplicity I have 2 streams trips
and tasks
, they get their data feed from debezium, so far so good.
My problem is when I create a KSQL table that reflects some aggregated data, I assume that the backing topic should eventually have a compacted results, but in fact it is not, as in the below example
-- trips stream
CREATE STREAM trips_raw (
id bigint, gross_merchandise_value double, vehicle_id bigint, trip_code string,
status string,time_slot string, number_of_orders integer, supplier_id integer,
trip_start_time bigint, agent_id integer, trip_number integer, returnes_handled BOOLEAN,
modification_date bigint, created_by integer, modified_by integer, creation_date bigint
)
WITH (KAFKA_TOPIC='trips', VALUE_FORMAT='json');
--tasks stream
CREATE STREAM tasks_raw (id bigint, delivery_trip_id bigint, agent_id integer, creation_date bigint,
modification_date bigint,
status string, created_by integer, modified_by integer, request_id bigint)
WITH (KAFKA_TOPIC='tasks',VALUE_FORMAT='json');
-- THE AGGREGATED TABLE (just simple view for sake of simplicity)
create table trips_actions_count as
select count(1), ID from trips_raw
group by ID;
----- TEST DATA ------
INSERT INTO trips_raw (
id, gmv, vehicle_id , trip_code, status, trip_start_time , MODIFICATION_DATE, CREATED_BY, MODIFIED_BY,CREATION_DATE
) VALUES (
1, 100.5, 523, 'TRIP_1', 'CREATED', 1616480285000, 1616530285000, 123, 123, 1616444781000
);
INSERT INTO trips_raw (
id, gmv, vehicle_id , trip_code, status, trip_start_time , MODIFICATION_DATE, CREATED_BY, MODIFIED_BY,CREATION_DATE
) VALUES (
1, 100.5, 523, 'TRIP_1', 'ARRIVED', 1616480285000, 1616540285000, 123, 123, 1616444781000
);
INSERT INTO trips_raw (
id, gmv, vehicle_id , trip_code, status, trip_start_time , MODIFICATION_DATE, CREATED_BY, MODIFIED_BY,CREATION_DATE
) VALUES (
1, 100.5, 523, 'TRIP_1', 'COMPLETED', 1616480285000, 1616550285000, 123, 123, 1616444781000
);
When I tail the created topic backing the table TRIPS_ACTIONS_COUNT
, I got the below results,
kafka-console-consumer --bootstrap-server localhost:9092 --topic TRIPS_ACTIONS_COUNT --from-beginning
{"KSQL_COL_0":1}
{"KSQL_COL_0":2}
{"KSQL_COL_0":3}
kafka-topics --bootstrap-server localhost:9092 --describe --topics-with-overrides --topic TRIPS_ACTIONS_COUNT
Topic: TRIPS_ACTIONS_COUNT PartitionCount: 3 ReplicationFactor: 1 Configs: cleanup.policy=compact,segment.bytes=1073741824
I assume that the TRIPS_ACTIONS_COUNT
should be compacted, so that when a consumer read it, it should only get the latest value for a specific key, which is in my case {"KSQL_COL_0":3}
.
I think I am missing something, not sure what is it yet?