0
votes

I have a KeyedStream to shard a stream of events by key. Each keyed stream will emit events that then need to be recombined with all other events from the other keyed operators to form a single graph that will exist in Flink state.

The graph then needs to be processed / searched and possibly emit events downstream. I want the graph operator to be able to be horizontally scalable, i.e. each parallel operator to process a subset of the graph (but which will require each operator to have access to the whole of the graph). I'm interested in how I can spread the load across all parallel operators.

// key input events for processing by key
KeyedStream<MyEvent> keyedStream = myInputStream.keyBy(...);
// process each keyed input stream and produce output events that need to be combined into a graph
SingleOutputStreamOperator<MyGraphEvent> graphStream = keyedStream.process(...));
// recombine into a single graph operator via broadcast(), then process
DataStream<MyOutputEvent> output = graphStream.broadcast().flatMap(new MyGraphFlatmapFunction());

I think I can use broadcast() to ensure that all outputs from each of the keyed operators is sent to every downstream operator.

MyGraphFlatmapFunction takes a stream of MyGraphEvent objects, creates a graph inside internal state and optionally produces a stream of MyOutputEvent objects. I want each parallel operator to process a subset of the graph. Irrespective of the number of parallel instances of the operator exist, I'd like for all of the graph to be processed (meaning I don't want each operator to just process some random subset of the graph) and I don't want to have parallel operators process the same part of the graph (no duplicate processing).

I'd like to be able to do something within MyGraphFlatmapFunction like:

int index;
// I want to get the operator instance number & the number of parallel operators in the stream topology
int operatorIndex = getOperatorIndex();
int operatorCount = getTotalNumberOfParallelOperators();
// process every nth object
for (index = 0; index < someCollection.size(); index++) {
    if (index % operatorCount == operatorIndex) {
        // do some processing
    } else {
        continue;
    }
}

Is there a way of knowing how many instances of the parallel operator exist and which operator this is? Is there another way of achieving what I'm after?

1

1 Answers

2
votes

If you use a RichFlatMapFunction, you get access to the RuntimeContext via getRuntimeContext(). The RuntimeContext has the two methods you need:

  • getNumberOfParallelSubtasks()
  • getIndexOfThisSubtask()

Hope this helps.