4
votes

So question is in the subject. I think I dont understand correctly the work of repartition. In my mind when I say somedataset.repartition(600) I expect all data would be partioned by equal size across the workers (let say 60 workers).

So for example. I would have a big chunk of data to load in unbalanced files, lets say 400 files, where 20 % are 2Gb size and others 80% are about 1 Mb. I have the code to load this data:

val source = sparkSession.read.format("com.databricks.spark.csv")
  .option("header", "false")
  .option("delimiter","\t")
  .load(mypath)

Than I want convert raw data to my intermediate object, filter irrelevvant records, convert to final object (with additional attributes) and than partition by some columns and write to parquet. In my mind it seems reasonable to balance data (40000 partitions) across workers and than do the work like that:

val ds: Dataset[FinalObject] = source.repartition(600)
  .map(parse)
  .filter(filter.IsValid(_))
  .map(convert)
  .persist(StorageLevel.DISK_ONLY)
val count = ds.count
log(count)
val partitionColumns = List("region", "year", "month", "day")

ds.repartition(partitionColumns.map(new org.apache.spark.sql.Column(_)):_*)
  .write.partitionBy(partitionColumns:_*)
  .format("parquet")
  .mode(SaveMode.Append)
  .save(destUrl)

But it fails with

ExecutorLostFailure (executor 7 exited caused by one of the running tasks) Reason: Container killed by YARN for exceeding memory limits. 34.6 GB of 34.3 GB physical memory used. Consider boosting spark.yarn.executor.memoryOverhead.

When I do not do repartition everything is fine. Where I do not understand repartition correct?

1

1 Answers

2
votes

Your logic is correct for repartition as well as partitionBy but before using repartition you need to keep in mind this thing from several sources.

Keep in mind that repartitioning your data is a fairly expensive operation. Spark also has an optimized version of repartition() called coalesce() that allows avoiding data movement, but only if you are decreasing the number of RDD partitions.

If you want that your task must be done then please increase drivers and executors memory