0
votes

I have a Flink application running in Amazon's Kinesis Data Analytics Service (managed Flink cluster). In the app, I read in user data from a Kinesis stream, keyBy userId, and then aggregate some user information. After asking this question, I learned that Flink will split the reading of a stream across physical hosts in a cluster. Flink will then forward incoming events to the host that has the aggregator task assigned to the key space that corresponds to the given event.

With this in mind, I am trying to decide what to use as a partition key for the Kinesis stream that my Flink application reads from. My goal is to limit network traffic between hosts in the Flink cluster in order to optimize performance of my Flink application. I can either partition randomly, so the events are evenly distributed across the shards, or I can partition my shards by userId.

The decision depends on how Flink works internally. Is Flink smart enough to assign the local aggregator tasks on a host a key space that will correspond to the key space of the shard(s) the Kinesis consumer task on the same host is reading from? If this is the case, then sharding by userId would result in ZERO network traffic, since each event is streamed by the host that will aggregate it. It seems like Flink would not have a clear way of doing this, since it does not know how the Kinesis streams are sharded.

OR, does Flink randomly assign each Flink consumer task a subset of shards to read and randomly assign aggregator tasks a portion of the key space? If this is the case, then it seems a random partitioning of shards would result in the least amount of network traffic since at least some events will be read by a Flink consumer that is on the same host as the event's aggregator task. This would be better than partitioning by userId and then having to forward all events over the network because the keySpace of the shards did not align with the assigned key spaces of the local aggregators.

1

1 Answers

1
votes

10 years ago, it was really important that as little data as possible is shipped over the network. Since 5 years, network has become so incredible fast that you notice little difference between accessing a chunk of data over network or memory (random access is of course still much faster), such that I wouldn't sweat to much about the additional traffic (unless you have to pay for it). Anecdotally, Google Datastream started to stream all data to a central shuffle server between two tasks, effectively doubling the traffic; but they still experience tremendous speedups on their Petabyte network.

So with that in mind, let's move to Flink. Flink currently has no way to dynamically adjust to shards as they can come and go over time. In half a year with FLIP-27, it could be different.

For now, there is a workaround, currently mostly used in Kafka-land (static partition). DataStreamUtils#reinterpretAsKeyedStream allows you to specify a logical keyby without a physical shuffle. Of course, you are responsible that the provided partitioning corresponds to the reality or else you would get incorrect results.