I'd create a sink with state that would hold on to the messages that are passed in. When the count gets high enough (1000) the sink sends the batch. The state can be in memory (e.g. an instance variable holding an ArrayList of messages), but you should use checkpoints so that you can recover that state in case of a failure of some kind.
When your sink has checkpointed state, it needs to implement CheckpointedFunction (in org.apache.flink.streaming.api.checkpoint) which means you need to add two methods to your sink:
@Override
public void snapshotState(FunctionSnapshotContext context) throws Exception {
checkpointedState.clear();
checkpointedState.add(new HttpSinkStateItem(buffer));
}
@Override
public void initializeState(FunctionInitializationContext context) throws Exception {
ListStateDescriptor<HttpSinkStateItem> descriptor =
new ListStateDescriptor<>(
"HttpSink-State",
HttpSinkStateItem.class);
checkpointedState = context.getOperatorStateStore().getListState(descriptor);
if (context.isRestored()) {
for (HttpSinkStateItem item: checkpointedState.get()) {
buffer = new ArrayList<>(item.getPending());
}
}
}
You can also use a timer in the sink (if the input stream is keyed/partitioned) to send periodically if the count doesn't reach your threshold.