2
votes

I have a Hive table that has a lot of small parquet files and I am creating a Spark data frame out of it to do some processing using SparkSQL. Since I have a large number of splits/files my Spark job creates a lot of tasks, which I don't want. Basically what I want is the same functionality that Hive provides, that is, to combine these small input splits into larger ones by specifying a max split size setting. How can I achieve this with Spark? I tried using the coalesce function, but I can only specify the number of partitions with it (I can only control the number of output files with it). Instead I really want some control over the (combined) input split size that a task processes.

Edit: I am using Spark itself, not Hive on Spark.

Edit 2: Here is the current code I have:

//create a data frame from a test table
val df = sqlContext.table("schema.test_table").filter($"my_partition_column" === "12345")

//coalesce it to a fixed number of partitions. But as I said in my question 
//with coalesce I cannot control the file sizes, I can only specify 
//the number of partitions
df.coalesce(8).write.mode(org.apache.spark.sql.SaveMode.Overwrite)
.insertInto("schema.test_table")
1

1 Answers

1
votes

I have not tried but read it in getting started guide that setting this property should work "hive.merge.sparkfiles=true"

https://cwiki.apache.org/confluence/display/Hive/Hive+on+Spark%3A+Getting+Started

In case using Spark on Hive, than Spark's abstraction doesn't provide explicit split of data. However we can control the parallelism in several ways.

  1. You can leverage DataFrame.repartition(numPartitions: Int) to explicitly control the number of partitions.
  2. In case you are using Hive Context than ensure hive-site.xml contains the CombinedInputFormat. That may help.

For more info, take a look at following documentation about Spark data parallelism - http://spark.apache.org/docs/latest/tuning.html#level-of-parallelism.