I'd create a sink with state that would hold on to the messages that are passed in. When the count gets high enough (1000) the sink sends the batch. The state can be in memory (e.g. an instance variable holding an ArrayList of messages), but you should use checkpoints so that you can recover that state in case of a failure of some kind.
When your sink has checkpointed state, it needs to implement CheckpointedFunction (in org.apache.flink.streaming.api.checkpoint) which means you need to add two methods to your sink:
@Override
public void snapshotState(FunctionSnapshotContext context) throws Exception {
checkpointedState.clear();
// HttpSinkStateItem is a user-written class
// that just holds a collection of messages (Strings, in this case)
//
// Buffer is declared as ArrayList<String>
checkpointedState.add(new HttpSinkStateItem(buffer));
}
@Override
public void initializeState(FunctionInitializationContext context) throws Exception {
// Mix and match different kinds of states as needed:
// - Use context.getOperatorStateStore() to get basic (non-keyed) operator state
// - types are list and union
// - Use context.getKeyedStateStore() to get state for the current key (only for processing keyed streams)
// - types are value, list, reducing, aggregating and map
// - Distinguish between state data using state name (e.g. "HttpSink-State")
ListStateDescriptor<HttpSinkStateItem> descriptor =
new ListStateDescriptor<>(
"HttpSink-State",
HttpSinkStateItem.class);
checkpointedState = context.getOperatorStateStore().getListState(descriptor);
if (context.isRestored()) {
for (HttpSinkStateItem item: checkpointedState.get()) {
buffer = new ArrayList<>(item.getPending());
}
}
}
You can also use a timer in the sink (if the input stream is keyed/partitioned) to send periodically if the count doesn't reach your threshold.