2
votes

I have a Flink streaming application that need the ability to 'pause' and 'unpause' processing on a specific keyed stream. 'Processing' means just performing some simple anomaly detection on the stream.

The flow we are thinking about works like this:

Stream of Commands, either ProcessCommand, PauseCommand, or ResumeCommand, each with an id that is used to KeyBy.

ProcessCommands will check if the key is paused before being processed, and buffer if not.

PauseCommands will pause the processing of a key.

ResumeCommands will unpause the processing of a key and flush the buffer through.

Does this flow seem reasonable and, if so, would I be able to use something like the split operator to achieve?

Example stream, individual record timestamps ommitted:

[{command: process, id: 1}, {command: pause, id: 1}, {command: process, id: 1}, {command: resume, id: 1}, {command: process, id: 1}]

Flow:
=>
{command: process, id: 1} # Sent downstream for analysis
=> 
{command: pause, id: 1} # Starts the buffer for id 1
=>
{command: process, id: 1} # Buffered into another output stream
=> 
{command: resume, id: 1} # Turns off buffering, flushes [{command: process, id: 1}] downstream
=>
{command: process, id: 1} # Sent downstream for processing as the buffering has been toggled off 
1
So, your actual stream (which contains records along with id) will receive the commands as a part of the record and the stream processing will be controlled using these commands right? - shriyog
Yes, exactly right @shriyog. Each record has an id and a command. - austin_ce
What's the difference between process and pause commands? Can you please give an example showcasing the input command stream and expected behavior? - shriyog
@shriyog, do those edits do a better job explaining? Thank you for your help! - austin_ce

1 Answers

1
votes

This can be achieved using Flink's Window operator. Firstly, create a POJO or tuple based stream by applying a map operation.

Then, as per your needs, you can use keyBy on that stream to get a keyedStream.

Now, by using a combination of time-based infinite window, a trigger, and a window function, you can achieve the switching behavior of your command stream.

Basically, you can use windows as your buffer, which after receiving a pause record, holds the process records until a resume record is received. You would be writing a custom trigger which evicts the window(a buffer) as per your scenario.

Following is a custom implementation of Trigger having onElement() overridden method.

/**
 * We trigger the window processing as per command inside the record. The
 * process records are buffered when a pause record is received and the
 * buffer is evicted once resume record is received. If no pause record is
 * received earlier, then for each process record the buffer is evicted.
 */
@Override
public TriggerResult onElement(Tuple2<Integer, String> element, long timestamp, Window window,
        TriggerContext context) throws Exception {
    if (element.f1.equals("pause")) {
        paused = true;
        return TriggerResult.CONTINUE;
    } else if (element.f1.equals("resume")) {
        paused = false;
        return TriggerResult.FIRE_AND_PURGE;
    } else if (paused) // paused is a ValueState per keyed stream.
        return TriggerResult.CONTINUE;
    return TriggerResult.FIRE_AND_PURGE;
}

Check out the complete working example in this github repository