(Apologies for the inevitably terrible formatting as I’m posting from my phone)
I am doing a keyBy and then an aggregate but Flink is not grouping the data correctly (instead each event falls into its own group by).
Example:
Class Purchase {
String product;
Integer quantity;
}
Class Filter {
String product;
public int hashCode() {
final int prime = 31;
int result = 1;
result = prime * result + ((product == bull) ? 0 : displayName.hashCode());
}
Class FilteredPurchase {
Filter filter;
Purchase purchase;
}
DataStream<FilteredPurchase> =
...
.keyBy(“filter”) //This works
.keyBy(x -> x.getFilter()). // This doesn’t
.sum(“trade.quantity”);
If we consider the case of a stream that looks like:
[
{“filter”: {“product”: null}, “purchase”: {“product”: “apple”, “quantity”: 10},
{“filter”: {“product”: null}, “purchase”: {“product”: “apple”, “quantity”: 10},
{“filter”: {“product”: “apple”}, “purchase”: {“product”: “apple”, “quantity”: 10},
{“filter”: {“product”: “apple”}, “purchase”: {“product”: “apple”, “quantity”: 10},
]
I would expect it to be keyed into 2 partitions (because there are two filters) the total in each being 20. However, I actually end up with 4 partitions each with a total of 10.
Interestingly, if I use the field expression version it does do what I want, but I would like to keep everything as a POJO as I intend on doing some more with it later on.
Am I missing something here? Can a KeySelector return a POJO?