0
votes

(Apologies for the inevitably terrible formatting as I’m posting from my phone)

I am doing a keyBy and then an aggregate but Flink is not grouping the data correctly (instead each event falls into its own group by).

Example:

Class Purchase {
    String product;
    Integer quantity;
}

Class Filter {
    String product;

    public int hashCode() {
        final int prime = 31;
        int result = 1;
        result = prime * result + ((product == bull) ? 0 : displayName.hashCode());
}

Class FilteredPurchase {
    Filter filter;
    Purchase purchase;
}

DataStream<FilteredPurchase> =
    ...
    .keyBy(“filter”) //This works
    .keyBy(x -> x.getFilter()). // This doesn’t 
    .sum(“trade.quantity”);

If we consider the case of a stream that looks like:

[
    {“filter”: {“product”: null}, “purchase”: {“product”: “apple”, “quantity”: 10},
    {“filter”: {“product”: null}, “purchase”: {“product”: “apple”, “quantity”: 10},
    {“filter”: {“product”: “apple”}, “purchase”: {“product”: “apple”, “quantity”: 10},
    {“filter”: {“product”: “apple”}, “purchase”: {“product”: “apple”, “quantity”: 10},
]

I would expect it to be keyed into 2 partitions (because there are two filters) the total in each being 20. However, I actually end up with 4 partitions each with a total of 10.

Interestingly, if I use the field expression version it does do what I want, but I would like to keep everything as a POJO as I intend on doing some more with it later on.

Am I missing something here? Can a KeySelector return a POJO?

1

1 Answers

0
votes

First question is why you don't just use the product (String) as your key, since that's all you've got in the Filter class. So

.keyBy(x -> x.getProduct())

But in any case, I think your key class (Filter) has to implement the equals() method.