I have a KeyedStream to shard a stream of events by key. Each keyed stream will emit events that then need to be recombined with all other events from the other keyed operators to form a single graph that will exist in Flink state.
The graph then needs to be processed / searched and possibly emit events downstream. I want the graph operator to be able to be horizontally scalable, i.e. each parallel operator to process a subset of the graph (but which will require each operator to have access to the whole of the graph). I'm interested in how I can spread the load across all parallel operators.
// key input events for processing by key
KeyedStream<MyEvent> keyedStream = myInputStream.keyBy(...);
// process each keyed input stream and produce output events that need to be combined into a graph
SingleOutputStreamOperator<MyGraphEvent> graphStream = keyedStream.process(...));
// recombine into a single graph operator via broadcast(), then process
DataStream<MyOutputEvent> output = graphStream.broadcast().flatMap(new MyGraphFlatmapFunction());
I think I can use broadcast()
to ensure that all outputs from each of the keyed operators is sent to every downstream operator.
MyGraphFlatmapFunction
takes a stream of MyGraphEvent
objects, creates a graph inside internal state and optionally produces a stream of MyOutputEvent
objects. I want each parallel operator to process a subset of the graph. Irrespective of the number of parallel instances of the operator exist, I'd like for all of the graph to be processed (meaning I don't want each operator to just process some random subset of the graph) and I don't want to have parallel operators process the same part of the graph (no duplicate processing).
I'd like to be able to do something within MyGraphFlatmapFunction
like:
int index;
// I want to get the operator instance number & the number of parallel operators in the stream topology
int operatorIndex = getOperatorIndex();
int operatorCount = getTotalNumberOfParallelOperators();
// process every nth object
for (index = 0; index < someCollection.size(); index++) {
if (index % operatorCount == operatorIndex) {
// do some processing
} else {
continue;
}
}
Is there a way of knowing how many instances of the parallel operator exist and which operator this is? Is there another way of achieving what I'm after?