0
votes

my spark job currently runs in 59 mins. I want to optimize it so that I it takes less time. I have noticed that the last step of the job takes a lot of time (55 mins) (see the screenshots of the spark job in Spark UI below).

I need to join a big dataset with a smaller one, apply transformations on this joined dataset (creating a new column).

At the end, I should have a dataset repartitioned based on the column PSP (see snippet of the code below). I also perform a sort at the end (sort each partition based on 3 columns).

All the details (infrastructure, configuration, code) can be found below.

Snippet of my code :

    spark.conf.set("spark.sql.shuffle.partitions", 4158)

    val uh = uh_months
      .withColumn("UHDIN", datediff(to_date(unix_timestamp(col("UHDIN_YYYYMMDD"), "yyyyMMdd").cast(TimestampType)),
        to_date(unix_timestamp(col("january"), "yyyy-MM-dd").cast(TimestampType))))
"ddMMMyyyy")).cast(TimestampType)))
      .withColumn("DVA_1", date_format(col("DVA"), "dd/MM/yyyy"))
      .drop("UHDIN_YYYYMMDD")
      .drop("january")
      .drop("DVA")
      .persist()

    val uh_flag_comment = new TransactionType().transform(uh)
    uh.unpersist()

    val uh_joined = uh_flag_comment.join(broadcast(smallDF), "NO_NUM")
      .select(
        uh.col("*"),
        smallDF.col("PSP"),
        smallDF.col("minrel"),
        smallDF.col("Label"),
        smallDF.col("StartDate"))
      .withColumnRenamed("DVA_1", "DVA")

    smallDF.unpersist()

    val uh_to_be_sorted = uh_joined.repartition(4158, col("PSP"))
    val uh_final = uh_joined.sortWithinPartitions(col("NO_NUM"), col("UHDIN"), col("HOURMV"))

    uh_final

EDITED - Repartition logic

    val sqlContext = spark.sqlContext
    sqlContext.udf.register("randomUDF", (partitionCount: Int) => {
      val r = new scala.util.Random
      r.nextInt(partitionCount)
      // Also tried with r.nextInt(partitionCount) + col("PSP")
    })

    val uh_to_be_sorted = uh_joined
        .withColumn("tmp", callUDF("RandomUDF", lit("4158"))
        .repartition(4158, col("tmp"))
        .drop(col("tmp"))
    val uh_final = uh_to_be_sorted.sortWithinPartitions(col("NO_NUM"), col("UHDIN"), col("HOURMV"))

    uh_final

smallDF is a small dataset (535MB) that I broadcast.

TransactionType is a class where I add a new column of string elements to my uh dataframe based on the value of 3 columns (MMED, DEBCRED, NMTGP), checking the values of those columns using regex.

I previously faced a lot of issues (job failing) because of shuffle blocks that were not found. I discovered that I was spilling to disk and had a lot of GC memory issues so I increased the "spark.sql.shuffle.partitions" to 4158.

WHY 4158 ?

Partition_count = (stage input data) / (target size of your partition)

so Shuffle partition_count = (shuffle stage input data) / 200 MB = 860000/200=4300

I have 16*24 - 6 =378 cores availaible. So if I want to run every tasks in one go, I should divide 4300 by 378 which is approximately 11. Then 11*378=4158

Spark Version: 2.1

Cluster configuration:

  • 24 compute nodes (workers)
  • 16 vcores each
  • 90 GB RAM per node
  • 6 cores are already being used by other processes/jobs

Current Spark configuration:

-master: yarn

-executor-memory: 26G

-executor-cores: 5

-driver memory: 70G

-num-executors: 70

-spark.kryoserializer.buffer.max=512

-spark.driver.cores=5

-spark.driver.maxResultSize=500m

-spark.memory.storageFraction=0.4

-spark.memory.fraction=0.9

-spark.hadoop.fs.permissions.umask-mode=007

How is the job executed:

We build an artifact (jar) with IntelliJ and then send it to a server. Then a bash script is executed. This script:

  • export some environment variables (SPARK_HOME, HADOOP_CONF_DIR, PATH and SPARK_LOCAL_DIRS)

  • launch the spark-submit command with all the parameters defined in the spark configuration above

  • retrieves the yarn logs of the application

Spark UI screenshots

DAG

DAG

Stages all jobs

Detailed stages of the job to improve

Stage that takes a lot of time

1
On average, your task takes about 5 minutes to execute, but you have an outlier that takes 49. It is a symptom of data skewness.Gelerion
Thanks @Gelerion I will investigate itAli

1 Answers

1
votes

@Ali

From the Summary Metrics we can say that your data is Skewed ( Max Duration : 49 min and Max Shuffle Read Size/Records : 2.5 GB/ 23,947,440 where as on an average it's taking about 4-5 mins and processing less than 200 MB/1.2 MM rows)

Now that we know the problem might be skew of data in few partition(s) , I think we can fix this by changing repartition logic val uh_to_be_sorted = uh_joined.repartition(4158, col("PSP")) by chosing something (like some other column or adding any other column to PSP)

few links to refer on data skew and fix

https://dzone.com/articles/optimize-spark-with-distribute-by-cluster-by

https://datarus.wordpress.com/2015/05/04/fighting-the-skew-in-spark/

Hope this helps