I'm looking for a way to use Spark on Dataproc built with Scala 2.11. I want to use 2.11 since my jobs pulls in ~10 BigQuery tables and I'm using the new reflection libraries to map the corresponding objects to case classes. (There's a bug with the new reflection classes and concurrency which is only fixed in Scala 2.11) I've tried working around this issues by setting executor-cores to 1 but the performance decrease is painful. Is there a better way?
1 Answers
In general, setting executor-cores to 1 is a reasonable way to work around concurrency issues, since it can often happen that third-party libraries you may incorporate into your Spark jobs also have thread-safety problems; the key here is that you should be able to resize the executors to each only have 1 core without really sacrificing performance (the larger scheduling overhead and yarn overhead might mean o the order of, say ~10% performance decrease, but certainly nothing unmanageable).
I'm assuming you're referring to some multiplicative factor performance decrease due to, say, only using 2 out of 8 cores on an 8-core VM (Dataproc packs 2 executors per VM by default). The way to fix this is simply to also adjust spark.executor.memory
down proportionally to match up with the 1 core. For example, in your cluster config (gcloud dataproc clusters describe your-cluster-name
) if you use 4-core VMs you might see something like:
spark:spark.executor.cores: '2'
spark:spark.executor.memory: 5586m
YARN packs entirely based on memory, not cores, so this means 5586m
is designed to fit in half a YARN node, and thus correspond to 2 cores. If you turn up your cluster like:
gcloud dataproc clusters create \
--properties spark:spark.executor.cores=1,spark:spark.executor.memory=2000m
Then you should end up with a setup which still uses all the cores, but without concurrency issues (one worker thread in each executor process only).
I didn't just use 5586/2
in this case because you have to factor in spark:spark.yarn.executor.memoryOverhead
as well, so basically you have to add in the memoryOverhead, then divide by two, then subtract the memoryOverhead again to determine the new executor size, and beyond that the allocations also round to the next multiple of a base chunk size, which I believe is 512m.
In general, you can use trial-and-error by starting a bit lower on the memory allocation per core, and then increasing it if you find you need more memory headroom.
You don't have to redeploy a cluster to check these either; you can specify these at job submission time instead for faster turnaround:
gcloud dataproc jobs submit spark \
--properties spark.executor.cores=1,spark.executor.memory=2000m