Let's say that I know that my dataset is unbalanced and I know the distribution of the keys. I'd like leverage this to write a custom partitioner to get the most out of the operator instances.
I know about DataStream#partitionCustom. However, if my stream is keyed, will it still work properly? My job would look something like:
KeyedDataStream afterCustomPartition = keyedStream.partitionCustom(new MyPartitioner(), MyPartitionKeySelector())
DataStreamUtils.reinterpretAsKeyedStream(afterCustomPartition, new MyGroupByKeySelector<>()).sum()
What I'm trying to achieve is:
- Having a stream keyBy according to some key so that the reduce function will only be called with elements from that key.
- The group by split the work across nodes based on some custom partitioning.
- The custom partitioning returning a number based on the number of parallel operator instances (which will be fixed and not subject to rescaling).
- The custom partioning returning different values from the keyBy. However,
keyBy(x) = keyBy(y) => partition(x) = partition(y). - Having pre-aggregation to minimize network traffic before partitioning.
Example of the use-case:
- Dataset: [(0, A), (0, B), (0, C), (1, D), (2, E)]
- Number of parallel operator instances: 2
- Group by function: returns the 1st element of the pair
- Partition function: returns 0 for key 0 and 1 for keys 1 and 2. Advantage: deal with the data skew that could be sending keys 0 and 1 to the same operator instance which would mean that one operator instance would receive 80% of the dataset.