While the answer provided by KrisP highlights all the important differences I think it is worth noting that mapPartitions
is just a low level building block behind higher level transformations not a method to achieve shared state.
Although mapPartitions
can be used to make shared-liked state explicit it technically not shared (its lifetime is limited to mapPartitions
closure`) and there are other means to achieve it. In particular, variables which are referenced inside closures are shared inside a partition. To illustrate that lets play a little with singletons:
object DummySharedState {
var i = 0L
def get(x: Any) = {
i += 1L
i
}
}
sc.parallelize(1 to 100, 1).map(DummySharedState.get).max
// res3: Long = 100
sc.parallelize(1 to 100, 2).map(DummySharedState.get).max
// res4: Long = 50
sc.parallelize(1 to 100, 50).map(DummySharedState.get).max
// res5: Long = 2
and a similar thing in PySpark:
singleton module dummy_shared_state.py
:
i = 0
def get(x):
global i
i += 1
return i
main script:
from pyspark import SparkConf, SparkContext
import dummy_shared_state
master = "spark://..."
conf = (SparkConf()
.setMaster(master)
.set("spark.python.worker.reuse", "false"))
sc.addPyFile("dummy_shared_state.py")
sc.parallelize(range(100), 1).map(dummy_shared_state.get).max()
## 100
sc.parallelize(range(100), 2).map(dummy_shared_state.get).max()
## 50
Please note that spark.python.worker.reuse
option is set to false. If you keep default value you'll actually see something like this:
sc.parallelize(range(100), 2).map(dummy_shared_state.get).max()
## 50
sc.parallelize(range(100), 2).map(dummy_shared_state.get).max()
## 100
sc.parallelize(range(100), 2).map(dummy_shared_state.get).max()
## 150
At the end of the day you have to distinguish between three different things:
- broadcast variables which are designed to reduce network traffic an memory footprint by keeping a copy of the variable on the worker instead of shipping it with each task
- variables defined outside closure and referenced inside closure which has to be shipped with each task and are shared for this task
- variables defined inside closure which are not shared
On top of that there are some Python specific gotchas related to the usage of persistent interpreters.
Still there is no practical difference between map
(filter
or other transformation) and mapPartitions
when it comes to variable lifetime.