3
votes

How does a Kafka Sink connector ensure message ordering while fetching messages from partitions. I have multiple partitions and I have ensured message ordering while publishing of messages with hash-keys per partition. Now, when more than one Sink Tasks(and their workers) are scaled from multiple JVMs with the responsibility to fetch messages from same partition and to notify a destination system via HTTP, how can I guarantee that the destination system will receive the messages in order.

1
Kafka connect sink tasks can be thought of as consumers part of the same consumer group. You cannot have multiple tasks consuming from the same partition at any given moment. This goes for the entire set of workers running tasks. Typically ordering guarantees in Kafka are made by having a single partition topic. - dawsaw
@dawsaw I am referring to workers. Workers are processes and I assume workers would be spawned(auto-scaled). Let us consider each of those workers have the same connector (conn1) instance. I would also have the workers (conn & tasks) reading from same partition to maintain order at the time of "Fetch", however, there is a risk of Workers notifying (in my case invoking HTTP endpoints) in not the order of fetch as Workers would be part of separate JVM processes. - bhalochele
I encourage you to read this documentation on how distributed workers scale docs.confluent.io/3.1.1/connect/…. You won't have a scenario where multiple tasks associated with the same connector are allowed to talk to the same partition regardless of the number of workers you have. Unless I misunderstand your scenario - dawsaw

1 Answers

1
votes

Each sink task will receive the ordered events as available from their assigned topics, but as soon as it leaves the Kafka protocol handling, and is sent to a remote destination, whether that be a file or HTTP endpoint, order can only be guaranteed based upon that system's ordering semantics.

For example, if you're writing to Elasticsearch, you can "order" events (in Kibana) by specifying the timestamp field to index by. Similar for any (no)SQL database

A filesystem on the other hand, would order files by modification time, but events within any given file aren't guaranteed to be ordered (unless they come from one partition).

I find it unlikely an HTTP REST endpoint will be able to understand what order events need to be collected by, and that logic would need to be determined internally to that server endpoint. One option would be to post events to an endpoint that will accept the partition number, and the offset the record came from