0
votes

Based on document below "A global windows assigner assigns all elements with the same key to the same single global window"

https://ci.apache.org/projects/flink/flink-docs-release-1.2/dev/windows.html

Then I checked the source code and found assignWindows method of GlobalWindows just return the global Window and didn't do anything for the parameter element,so how all elements with the same key to the same single global window?

https://github.com/apache/flink/blob/12b4185c6c09101b64e12a84c33dc4d28f95cff9/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/GlobalWindows.java

@Override
public Collection<GlobalWindow> assignWindows(Object element, long timestamp, WindowAssignerContext context) {
    return Collections.singletonList(GlobalWindow.get());
}
1

1 Answers

2
votes

In Flink, windows and keys are largely independent of each other. Stream elements can be grouped by key, and by window, and these are orthogonal dimensions. (When we want to talk about the combination of a window with a key this is called a pane.)

Window instances don't have keys, and neither do window assigners. Instead, keys and key-partitioned state are part of the runtime context in which windows are evaluated.

When I was trying to understand the relationship of keys to window assigners I found it helpful to read through the WindowOperator's implementation of processElement. This code is called as each stream element arrives at a window operator. Paying attention to the role of the key, while leaving out a lot of other details, we see this:

public void processElement(StreamRecord<IN> element) throws Exception {
    final Collection<W> elementWindows = windowAssigner.assignWindows(
        element.getValue(), element.getTimestamp(), windowAssignerContext);

    ...

    final K key = this.<K>getKeyedStateBackend().getCurrentKey();

    ...

    for (W window: elementWindows) {

        ...

        windowState.add(element.getValue());

        triggerContext.key = key;
        triggerContext.window = window;

        TriggerResult triggerResult = triggerContext.onElement(element);
        if (triggerResult.isFire()) {
            ...
            emitWindowContents(window, contents);
        }

        ...
    }
}

Here you can see that the key is available to the window operator via getKeyedStateBackend(), but isn't even retrieved until after getting the windows for this element from the window assigner. The window assigner does its job without any concern for keys.

The key is later being fetched, though, so that it can be made available to the trigger via the trigger context.