0
votes

The topic contains 10 partitions that have messages generated every 3-to-4 seconds by various IoT devices. The key on the message is LocationId and DeviceId.The value is device related details.

The stream topology is deployed to 4 EC2 instances. The process must determine the latest update value from each of the devices and analyze for criticality.

What I am seeing is that since messages are distributed across multiple partitions, stream consumer sees older messages and they are not in order.

How do I determine the latest message for the specific key?

I am seeing following message behaviour on Kafka Cluster -

L1D1 at 1:00 am - critical=false (P1)
L2D2 at 1:00 am - critical=false (P1)
L1D1 at 1:02 am - critical=**true** (P2)
L2D2 at 1:05 am - critical=false (P1)
L1D1 at 1:03 am - critical=false (P2)
L2D2 at 1:03 am - critical=false (P1)

Notice that at 1:02 device D1 had a critical alert, but at 1:03 it wasn't. If processing messages by the stream is 1:03, 1:02 (any random order based on the partition)

How do I determine the latest message for specific device efficiently since the order is not guaranteed?

2

2 Answers

2
votes

How do I determine the latest message for specific device efficiently since the order is not guaranteed?

Kafka guarantees message ordering within a topic partition, but not across multiple topic partitions. What you need to do is to ensure that messages from the same device are being sent to the same topic partition. If you haven't changed Kafka's default settings, you can achieve this by using a device-specific identifier (think: DeviceId).

What I am seeing is that since messages are distributed across multiple partitions, stream consumer sees older messages and they are not in order.

If you use a composite key like (LocationId, DeviceId), then you will not get updates for the same device in order, because the device's messages are distributed across multiple partitions because the message key includes also LocationId.

The process must determine the latest update value from each of the devices and analyze for criticality. [...] How do I determine the latest message for specific device efficiently since the order is not guaranteed?

In your case I would change the message key from (LocationId, DeviceId) to just DeviceId. Let's call this "stream D".

If you still need the original grouping by (LocationId, DeviceId), you can achieve this by subsequently re-grouping (aka re-keying aka re-partitioning) the stream D from DeviceId to (LocationId, DeviceId) into a new, derived stream LD.

0
votes

What is your stream processing strategy? KSQL or the SDK? If you use KSQl you just need to create a stream/table

check: https://docs.confluent.io/current/ksql/docs/developer-guide/create-a-table.html