0
votes

I want implements the aggregationFunction by the processKeyedFunction, because the default aggregationFunction does not support rich function, Besides, I tryed the aggreagationFunction + processWindowFunction(https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/windows.html), but it also cannot satisfy my needs, so I have to use the basic processKeyedFunction to implement the aggregationFunction, the detail of my problem is as followed:

in processFunction, , I define a windowState for stage the aggregation value of elements, the code is as followed:

public void open(Configuration parameters) throws Exception {
followCacheMap = FollowSet.getInstance();
windowState = getRuntimeContext().getMapState(windowStateDescriptor);
currentTimer = getRuntimeContext().getState(new ValueStateDescriptor<Long>(
        "timer",
        Long.class
));

in processElement() function, I use the windowState (which is a MapState initiate in open function) to aggregate the window element, and register the first timeServie to clear current window state, the code is as followed:

 @Override
public void processElement(FollowData value, Context ctx, Collector<FollowData> out) throws Exception 
{
      if ( (currentTimer==null || (currentTimer.value() ==null) || (long)currentTimer.value()==0 ) && value.getClickTime() != null) {
            currentTimer.update(value.getClickTime() + interval);
            ctx.timerService().registerEventTimeTimer((long)currentTimer.value());
        } 
       windowState = doMyAggregation(value);
}

in onTimer() function, first, I register the next timeService in next One minute, and clear the window State

 @Override
    public void onTimer(long timestamp, OnTimerContext ctx, Collector<FollowData> out) throws Exception {
        currentTimer.update(timestamp + interval);   // interval is 1 minute
        ctx.timerService().registerEventTimeTimer((long)currentTimer.value());
        
        out.collect(windowState);
        windowState.clear();
     }

but when the program is running , I found that all the windowState in onTimer is empty, but it is not empyt in processElement() function, I don't know why this happens, maybe the execution logic is different, how can I fix this, Thanks in advance !



new added code about doMyAggregation() part


windowState is a MapState , key is "mykey", value is an self-defined Object AggregateFollow

public class AggregateFollow {
    private String clicked;
    private String unionid;
    private ArrayList allFollows;
    private int enterCnt;
    private Long clickTime;

}

and the doMyAggregation(value) function is pretty much like this , the function of doMyAggregation is to get all the value whose source field is 'follow', but if there are no values whose field is 'click' during 1 minute, the 'follow' value should be obsolete, in a word , it's like a join operation of 'follow' data and 'click' data,

AggregateFollow acc = windowState.get(windowkey);
    String flag = acc.getClicked();
    ArrayList<FollowData> followDataList = acc.getAllFollows();
    if ("0".equals(flag)) {
        if ("follow".equals(value.getSource())) {
            followDataList.add(value);
            acc.setAllFollows(followDataList);
        }
        if ("click".equals(value.getSource())) {
            String unionid = value.getUnionid();
            clickTime = value.getClickTime();
            if (followDataList.size() > 0) {
                ArrayList listNew = new ArrayList();
                for (FollowData followData : followDataList) {
                    followData.setUnionid(unionid);
                    followData.setClickTime(clickTime);
                    followData.setSource("joined_flag");   // 
                }
                acc.setAllFollows(listNew);
            }
            acc.setClicked("1");
            acc.setUnionid(unionid);
            acc.setClickTime(clickTime);
            windowState.put(windowkey, acc);
        }
    } else if ("1".equals(flag)) {
        if ("follow".equals(value.getSource())) {
            value.setUnionid(acc.getUnionid());
            value.setClickTime(acc.getClickTime());
            value.setSource("joined_flag");  
            followDataList.add(value);
            acc.setAllFollows(followDataList);
            windowState.put(windowkey, acc);
        }
    }

because of performance problem, original windowAPI is not a valid choice for me, the only way here I think is to use processFunction + ontimer and Guava Cache , Thanks a lot

2

2 Answers

0
votes

If windowState is empty, it would be helpful to see what doMyAggregation(value) is doing.

It's difficult to debug this, or propose good alternatives, without more context, but out.collect(windowState) isn't going to work as intended. What you might want to do instead would be to iterate over this MapState and collect each key/value pair it contains to the output.

0
votes

I changed the type of windowState from MapState to ValueState, and the problem is solved, maybe it is a bug or something, can anyone can explain this?