4
votes

My pyspark application runs a UDF over a 106,36 MB dataset (817.270 records), which takes about 100hours with regular python lambda functions. I have spawned a Google Dataproc cluster with 20 worker nodes with 8 vCPU's each. However, upon execution only 3 nodes and 3 vCPU's in total are used. Obviously, I would like the cluster to use all the resources that I make available.

The default number of partitions of my resulting dataframe is 8. I tried repartitioning it to 100 but the cluster keeps using only 3 nodes and 3 vCPU's. Also, when I run a command to check the number of executors that spark sees, it is only 3.

This is the pyspark code that gets executed:

from pyspark.sql.types import StringType, MapType
from pyspark.sql.functions import udf

customer_names = spark.createDataFrame(customer_names)

embargo_match_udf = udf(lambda x,y: embargoMatch(x,y), MapType(StringType(), StringType()))
customer_names = customer_names.withColumn('JaroDistance', embargo_match_udf('name','customer_code'))



result = customer_names.withColumn('jaro_similarity', customer_names.JaroDistance['max_jaro'])

result.write.format("com.databricks.spark.csv").save('gs://charles-embargo-bucket/sparkytuesday')


Here is some spark output as seen from my jupyter notebook

print(sc) -> <SparkContext master=yarn appName=PySparkShell>
print(result.rdd.getNumPartitions()) -> 8
result = result.repartition(100)
print(result.rdd.getNumPartitions()) -> 100
sc._jsc.sc().getExecutorMemoryStatus().size() -> 3
2

2 Answers

5
votes

For those interested in how I solved the issue:

By default my spark context assumed two worker nodes, no matter how many extra nodes I spawned in Google Cloud's Dataproc UI. Therefore I manually changed the Spark Context as follows:

from pyspark.sql import SparkSession
from pyspark import SparkContext
from pyspark.conf import SparkConf

    sc.stop()
    SparkContext.setSystemProperty('spark.executor.cores', '4')
    SparkContext.setSystemProperty('spark.executor.instances', '5')
    sc = SparkContext("yarn", "embargotest")
    spark = SparkSession.builder.appName('embargotest').getOrCreate()

In addition, I explicitly partitioned the customer_names dataset to 20 (4 cores x 5 instances) before I apply the .withColumn function to this dataframe.

customer_names = spark.createDataFrame(customer_names).repartition(20)

Hope this can help someone with a similar issue!

1
votes

Additionally, you may want to try the following to have PySpark dynamically adjust the number of executors in your application via Dynamic Allocation:

SparkContext.setSystemProperty("spark.dynamicAllocation.enabled", "true")