5
votes

Trying to understand how Hive partitions relate to Spark partitions, culminating in a question about joins.

I have 2 external Hive tables; both backed by S3 buckets and partitioned by date; so in each bucket there are keys with name format date=<yyyy-MM-dd>/<filename>.

Question 1:

If I read this data into Spark:

val table1 = spark.table("table1").as[Table1Row]
val table2 = spark.table("table2").as[Table2Row]

then how many partitions are the resultant datasets going to have respectively? Partitions equal to the number of objects in S3?

Question 2:

Suppose the two row types have the following schema:

Table1Row(date: Date, id: String, ...)
Table2Row(date: Date, id: String, ...)

and that I want to join table1 and table2 on the fields date and id:

table1.joinWith(table2,
  table1("date") === table2("date") && 
    table1("id") === table2("id")
)

Is Spark going to be able to utilize the fact that one of the fields being joined on is the partition key in the Hive tables to optimize the join? And if so how?

Question 3:

Suppose now that I am using RDDs instead:

val rdd1 = table1.rdd
val rdd2 = table2.rdd

AFAIK, the syntax for the join using the RDD API would look something like:

rdd1.map(row1 => ((row1.date, row1.id), row1))
  .join(rdd2.map(row2 => ((row2.date, row2.id), row2))))

Again, is Spark going to be able to utilize the fact that the partition key in the Hive tables is being used in the join?

2

2 Answers

4
votes

then how many partitions are the resultant datasets going to have respectively? Partitions equal to the number of objects in S3?

Impossible to answer given information you've provided. Number of partitions in latest versions depends on primarily on spark.sql.files.maxPartitionByte, although other factors can play some role as well.

Is Spark going to be able to utilize the fact that one of the fields being joined on is the partition key in the Hive tables to optimize the join?

Not as of today (Spark 2.3.0), however Spark can utilize bucketing (DISTRIBUTE BY) to optimize joins. See How to define partitioning of DataFrame?. This might change in the future, once Data Source API v2 stabilizes.

Suppose now that I am using RDDs instead (...) Again, is Spark going to be able to utilise the fact that the partition key in the Hive tables is being used in the join?

Not at all. Even if data is bucketed RDD transformations and functional Dataset transformations are black boxes. No optimization can be applied and are applied here.

0
votes

Answering in general,

Spark Partition - a (logical) chunk of a large distributed data set. Spark spawns a single Task for a single partition, which will run inside the executor JVM.

Hive Partitions is a way to organizes tables into partitions by dividing tables into different parts based on partition keys(columns). Partitions makes it simpler and clear to access data.

Few configurations which can be tweaked -

spark.sql.files.maxPartitionBytes - The maximum number of bytes to pack into a single partition when reading files (default 128MB)

spark.sql.files.openCostInBytes - The estimated cost to open a file, measured by the number of bytes could be scanned in the same time. This is used when putting multiple files into a partition. It is better to over-estimated, then the partitions with small files will be faster than partitions with bigger files (which is scheduled first). (default 4 MB)

spark.sql.shuffle.partitions - Configures the number of partitions to use when shuffling data for joins or aggregations. (defaults to 200)