2
votes

Context
Spark provides RDDs for which map functions can be used to lazily set up the operations for processing in parallel. RDD's can be created with a specified partitioning parameter that determines how many partitions to create per RDD, preferably this parameter equals the number of systems (Ex. You have 12 files to process, create an RDD with 3 partitions which splits the data into buckets of 4 each for 4 systems and all the files get processed concurrently in each system). It is my understand that these partitions control the portion of data that goes to each system for processing.

Issue
I need to fine tune and control how many functions run at same time per system. If 2 or more functions run on same GPU at the same time, the system will crash.

Question
If an RDD is not evenly nicely split (like in the example above), how many threads run concurrently on the system?

Example
In:

sample_files = ['one.jpg','free.jpg','two.png','zero.png',
                'four.jpg','six.png','seven.png','eight.jpg',
                'nine.png','eleven.png','ten.png','ten.png',
                'one.jpg','free.jpg','two.png','zero.png',
                'four.jpg','six.png','seven.png','eight.jpg',
                'nine.png','eleven.png','ten.png','ten.png',
                'eleven.png','ten.png']
CLUSTER_SIZE = 3
example_rdd = sc.parallelize(sample_files, CLUSTER_SIZE)
example_partitions = example_rdd.glom().collect()

# Print elements per partition
for i, l in enumerate(example_partitions): print "parition #{} length: {}".format(i, len(l))
# Print partition distribution
print example_partitions

# How many map functions run concurrently when the action is called on this Transformation?
example_rdd.map(lambda s: (s, len(s))
action_results = example_rdd.reduceByKey(add)

Out:

parition #0 length: 8
parition #1 length: 8
parition #2 length: 10
[ ['one.jpg', 'free.jpg', 'two.png', 'zero.png', 'four.jpg', 'six.png', 'seven.png', 'eight.jpg'], 
['nine.png', 'eleven.png', 'ten.png', 'ten.png', 'one.jpg', 'free.jpg', 'two.png', 'zero.png'], 
['four.jpg', 'six.png', 'seven.png', 'eight.jpg', 'nine.png', 'eleven.png', 'ten.png', 'ten.png', 'eleven.png', 'ten.png'] ]

In Conclusion
What I need to know, is if the RDD is split the way it is, what controls how many threads are processed simultaneously? Is it the number of cores, or is there a global parameter that can be set so it only processes 4 at a time on each partition (system)?

1
Have you tried looking at the Spark UI? After running your example, I get 3 tasks for each collect() action, which is the number of partitions you provided at the creation of the RDD. Outside of that, the number of tasks running in parallel depends on the Spark application configuration. For example, if you started with pyspark --master local[8] and you have at least 8 cores, it will be able to parallelize 8 tasks at once. - user197030
I have not since the jobs run pretty quickly. I will take a look, but I thought the UI would give a top level view of how many workers were running not how many concurrent thread was being processed in each worker. Ok so then you are saying if I limit the parameter to --master local[4] it should only parallelize 4 tasks at a time? I'm going to play around with it and see, thanks. - alfredox

1 Answers

1
votes

In what order does data get process from RDDs in Spark?

Unless it is some border case, like only one partition, order is arbitrary or nondeterministic. This will depend on the cluster, on the data and on different runtime events.

A number of partitions sets only a limit of overall parallelism for a given stage or in other words it is a minimal unit of parallelism in Spark. No matter how much resources you allocate you a single stage should process more data than at the time. Once again there can be border cases when worker is not accessible and task is rescheduled on another machine.

Another possible limit you can think of is the number of the executor threads. Even if you increase the number of partitions a single executor thread will process only one at the time.

Neither of the above tell you where or when given partition will be processed. While you can use some dirty, inefficient and non-portable tricks at the configuration level (like single worker with a single executor thread per machine) to make sure that only a one partition is processed on a given machine at the time it is not particularly useful in general.

As a rule of thumb I would say that Spark code should never be concerned wit a time an place it is executed. There are some low level aspects of the API which provides means to set partition specific preferences but as far as I know these don't provide hard guarantees.

That being said one can think of at least few ways you can approach this problem:

  • long running executor threads with configuration level guarantees - it could be acceptable if Spark is responsible only for loading and saving data
  • singleton objects which control queuing jobs on the GPU
  • delegating GPU processing to specialized service which ensures proper access

On a side not you may be interested in Large Scale Distributed Deep Learning on Hadoop Clusters which roughly describes an architecture which can be applicable here.