0
votes

I have a workflow constructed in Flink that consists of a custom source, a series of maps/flatmaps and a sink.

The run() method of my custom source iterates through the files stored in a folder and collects, through the collect() method of the context, the name and the contents of each file (I have a custom object that stores this info in two fields).

I then have a series of maps/flatmaps transforming such objects which are then printed into files using a custom sink. The execution graph as this is produced in the Flink's Web UI is the following:

Flink execution graph

I have a cluster or 2 workers setup to have 6 slots each (they both have 6 cores, too). I set the parallelism to 12. From the execution graph I see that the parallelism of the source is 1, while the rest of the workflow has parallelism 12.

When I run the workflow (I have around 15K files in the dedicated folder) I monitor, using htop, the resources of my workers. All the cores reach up to 100% utilisation for most of the time but every roughly 30 minutes or so, 8-10 of the cores become idle for about 2-3 minutes.

My questions are the following:

  1. I understand that the source runs having parallelism 1 which I believe is normal when reading from a local storage (my files are located into the same directory in each worker as I don't know which worker will be selected to execute the source). Is it normal indeed? Could you please explain why this is the case?

  2. The rest of my workflow is executed having parallelism 12 which looks to be correct as by checking the task managers' logs I get prints from all the slots (e.g., .... [Flat Map -> Map -> Map -> Sink: Unnamed (**3/12**)] INFO ...., .... [Flat Map -> Map -> Map -> Sink: Unnamed (**5/12**)] INFO ...., etc.)). What I don't understand though is if one slot is executing the source role and I have 12 slots in my cluster, how is the rest of the workflow executed by 12 slots? Is one slot acting for both the source and one instance of the rest of the workflow? If yes, how are the resources for this specific slot allocated? Would it be possible for someone to explain the steps undergoing in this workflow? For example (this might be wrong):

  • Slot 1 reads files and forwards them to available slots (2 to 12)
  • Slot 1 forwards one file to itself and stops reading until it finishes its job
  • When done, slot 1 reads more files and forwards them to slots that became available

I believe what I describe above is wrong but I give it as an example to better explain my question

  1. Why I have this idle state for the majority of the cores every 30 minutes (more or less) that lasts for about 3 minutes?
2

2 Answers

1
votes

To answer the specific question about parallelizing your read, I would do the following...

  1. Implement your custom source by extending the RichSourceFunction.
  2. In your open() method, call getRuntimeContext().getNumberOfParallelSubtasks() to get the total parallelism and call getRuntimeContext().getIndexOfThisSubtask() to get the index of the sub-task being initialized.
  3. In your run() method, as you iterate over files, get the hashCode() of each file name, modulo the total parallelism. If this is equal to your sub-task's index, then you process it.

In this way you can spread the work out over 12 sub-tasks, without having sub-tasks try to process the same file.

0
votes
  1. The single consumer setup limits the overall throughput of your pipeline to the performance of the only one consumer. Additionally, it introduces the heavy shuffle to all slots - in this case, all the data read by consumer gets serialized on this consumer slot as well, which is an additional CPU load. In contrast, having the consumer parallelism equal to map/flat map parallelsm would allow to chain the source -> map operations and avoid shuffle.
  2. By default, Flink allows subtasks to share slots even if they are subtasks of different tasks, so long as they are from the same job. The result is that one slot may hold an entire pipeline of the job. So, in your case slot 1 has both consumer and map/flat map tasks, and other slots have only map/flat map tasks. See here for more details: https://ci.apache.org/projects/flink/flink-docs-release-1.10/concepts/runtime.html#task-slots-and-resources. Also, you can actually view the instances for each subtasks on Web UI.
  3. Do you have checkpointing enabled ? If yes and if it's 30 minutes, then probably this is the interval when the state gets snapshotted.