0
votes

I have a keyd stream of data that looks like:

    {
        summary:Integer
        uid:String
        key:String
        .....
    }

I need to aggregate the summary values in some time range, and once I achieved a specifc number , to flush the summary and all the of the UID'S that influenced the summary to database/log file.

after the first flush , I want to discare all the uid's from the memory , and just flush every new item immediatelly.

So I tried this aggregate function.

public class AggFunc implements AggregateFunction<Item, Acc, Tuple2<Integer,List<String>>>{

    private static final long serialVersionUID = 1L;

    @Override
    public Acc createAccumulator() {
        return new Acc());
    }

    @Override
    public Acc add(Item value, Acc accumulator) {
        accumulator.inc(value.getSummary());
        accumulator.addUid(value.getUid);
        return accumulator;
    }

    @Override
    public Tuple2<Integer,List<String>> getResult(Acc accumulator) {
        List<String> newL = Lists.newArrayList(accumulator.getUids());
        accumulator.setUids(Lists.newArrayList());
        return Tuple2.of(accumulator.getSum(), newL);
    }

    @Override
    public Acc merge(Acc a, Acc b) {
        .....
    }

}

and in the aggregate process function , I flush the list to state, and if I need to save to dataBase I'm clearing the state and save flag in the state to indicate it.

But it seems crooked to me. And I'm not sure if that would work well for me.

Is there a better solution to this situation?

1
It is confusing your question. For me it seems that you need 2 queries. One to sum in a window time and other is a windowless. - Felipe
no. both of them are need the same window. the list of the "uids" gives me required debug indication on the summary - user2840073

1 Answers

0
votes

Work with a state inside a rich function. Keep adding the uid in your state and when the window triggers to flush the values. This page from the official documentation has an example.

https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/stream/state/state.html#using-keyed-state

For your case a ListState will work well.

EDIT:

The solution above is for non-window case. for window case simply use the aggrgation with apply function that can have a rich window function