1
votes

I'm using Apache Flink for stream processing.

After subscribing the messages from source(ex:Kafka, AWS Kinesis Data Streams) and then applying transformation, aggregation and etc. using Flink operators on streaming data I want to buffer final messages(ex:1000 in count) and post each batch in a single request to external REST API.

How to implement buffering mechanism(creating each 1000 records as a batch) in Apache Flink?

Flink pipileine: streaming Source --> transform/reduce using Operators --> buffer 1000 messages --> post to REST API

Appreciate your help!

1

1 Answers

2
votes

I'd create a sink with state that would hold on to the messages that are passed in. When the count gets high enough (1000) the sink sends the batch. The state can be in memory (e.g. an instance variable holding an ArrayList of messages), but you should use checkpoints so that you can recover that state in case of a failure of some kind.

When your sink has checkpointed state, it needs to implement CheckpointedFunction (in org.apache.flink.streaming.api.checkpoint) which means you need to add two methods to your sink:

@Override
public void snapshotState(FunctionSnapshotContext context) throws Exception {

    checkpointedState.clear();

    // HttpSinkStateItem is a user-written class 
    // that just holds a collection of messages (Strings, in this case)
    //
    // Buffer is declared as ArrayList<String>

    checkpointedState.add(new HttpSinkStateItem(buffer));

}

@Override
public void initializeState(FunctionInitializationContext context) throws Exception {

    // Mix and match different kinds of states as needed:
    //   - Use context.getOperatorStateStore() to get basic (non-keyed) operator state
    //        - types are list and union        
    //   - Use context.getKeyedStateStore() to get state for the current key (only for processing keyed streams)
    //        - types are value, list, reducing, aggregating and map
    //   - Distinguish between state data using state name (e.g. "HttpSink-State")      

    ListStateDescriptor<HttpSinkStateItem> descriptor =
        new ListStateDescriptor<>(
            "HttpSink-State",
            HttpSinkStateItem.class);

    checkpointedState = context.getOperatorStateStore().getListState(descriptor);

    if (context.isRestored()) {

        for (HttpSinkStateItem item: checkpointedState.get()) {
            buffer = new ArrayList<>(item.getPending());  
        }

    }       

}

You can also use a timer in the sink (if the input stream is keyed/partitioned) to send periodically if the count doesn't reach your threshold.