3
votes

In Apache Spark 2.4.5, how do you open up a set of parquet files that were written with bucketBy and saveAsTable?

For example:

case class VeryVeryDeeplyNestedThing(
  s: String,
  nested1: OtherVeryDeeplyNestedThing
)
case class OtherVeryDeeplyNestedThing (
  youGetTheIdeaNoOneWantsToHandWriteASqlStatementForThese: NestedMcNesty
)
List(VeryVeryDeeplyNestedThing(...)).toDS()
  .write
  .bucketBy(512, "s")
  .option("path", "/tmp/output")
  .format("parquet")
  .saveAsTable("mytable")

Now there's a set of parquet files in /tmp/output. Move the files in /tmp/output to /tmp/newPlace, and start a completely new spark session.

spark.read.parquet("/tmp/newPlace")
  .whatGoesHere?

What do you need to do to read them back in with the same bucketing information that they were written with? It doesn't seem like that information is baked into the parquet files themselves, or is that what happens?

[Edit: added worked example partially from https://kb.databricks.com/_static/notebooks/data/bucketing-example.html per @thebluephantom I think showing that reading does in fact require something special]

If you create the parquet files like this:

scala> def base = spark.range(1, 160000, 1, 16).select($"id" as "key", rand(12) as "value")
base: org.apache.spark.sql.DataFrame

scala> import org.apache.spark.sql.SaveMode
import org.apache.spark.sql.SaveMode

scala> base.write.format("parquet").bucketBy(16, "key").sortBy("value").option("path", "/tmp/example").mode(SaveMode.Overwrite).saveAsTable("bucketed")

scala> base.write.format("parquet").option("path", "/tmp/exampleunbucketed").mode(SaveMode.Overwrite).saveAsTable("unbucketed")

scala> val t2 = spark.table("bucketed")
t2: org.apache.spark.sql.DataFrame = [key: bigint, value: double]

scala> val t3 = spark.table("bucketed")
t3: org.apache.spark.sql.DataFrame = [key: bigint, value: double]

// This is joining two bucketed tables

scala> t3.join(t2, Seq("key")).explain()
== Physical Plan ==
*(2) Project [key#51L, value#52, value#58]
+- *(2) BroadcastHashJoin [key#51L], [key#57L], Inner, BuildRight
   :- *(2) Project [key#51L, value#52]
   :  +- *(2) Filter isnotnull(key#51L)
   :     +- *(2) FileScan parquet default.bucketed[key#51L,value#52] Batched: true, Format: Parquet, Location: InMemoryFileIndex[hdfs://ip-172-31-66-61.ec2.internal:50070/tmp/example], PartitionFilters: [], PushedFilters: [IsNotNull(key)], ReadSchema: struct<key:bigint,value:double>, SelectedBucketsCount: 16 out of 16
   +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, bigint, true]))
      +- *(1) Project [key#57L, value#58]
         +- *(1) Filter isnotnull(key#57L)
            +- *(1) FileScan parquet default.bucketed[key#57L,value#58] Batched: true, Format: Parquet, Location: InMemoryFileIndex[hdfs://ip-172-31-66-61.ec2.internal:50070/tmp/example], PartitionFilters: [], PushedFilters: [IsNotNull(key)], ReadSchema: struct<key:bigint,value:double>, SelectedBucketsCount: 16 out of 16

That has a FileScan parquet default.bucketed on both sides. Now just do a plain read of the parquet files, and explain the join:

scala> val t4 = spark.read.parquet("/tmp/example")
t4: org.apache.spark.sql.DataFrame = [key: bigint, value: double]

scala> t3.join(t4, Seq("key")).explain()
== Physical Plan ==
*(2) Project [key#51L, value#52, value#64]
+- *(2) BroadcastHashJoin [key#51L], [key#63L], Inner, BuildRight
   :- *(2) Project [key#51L, value#52]
   :  +- *(2) Filter isnotnull(key#51L)
   :     +- *(2) FileScan parquet default.bucketed[key#51L,value#52] Batched: true, Format: Parquet, Location: InMemoryFileIndex[hdfs://ip-172-31-66-61.ec2.internal:50070/tmp/example], PartitionFilters: [], PushedFilters: [IsNotNull(key)], ReadSchema: struct<key:bigint,value:double>, SelectedBucketsCount: 16 out of 16
   +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, bigint, true]))
      +- *(1) Project [key#63L, value#64]
         +- *(1) Filter isnotnull(key#63L)
            +- *(1) FileScan parquet [key#63L,value#64] Batched: true, Format: Parquet, Location: InMemoryFileIndex[hdfs://ip-172-31-66-61.ec2.internal:50070/tmp/example], PartitionFilters: [], PushedFilters: [IsNotNull(key)], ReadSchema: struct<key:bigint,value:double>

t4 doesn't have anything indicating that it's bucketed.

Does this matter? Is it still bucketed? Am I misreading the explain output? Or do I have to do something to make sure t4 uses buckets?

2
Updated answer. - thebluephantom
All works my side as expected. Cheers. - thebluephantom

2 Answers

2
votes

You don't. bucketBy is a table-based API, that simple.

Use bucket by so as to subsequently sort the tables and make subsequent JOINs faster by obviating shuffling. Use, thus for ETL for temporary, intermediate results processing in general.

Reading requires nothing special is added to the query, but the JOINed tables must BOTH be bucketed and have same number of buckets and partitions. See this excellent post: https://kb.databricks.com/_static/notebooks/data/bucketing-example.html. Also, spark sql shuffle partitions must equal number of buckets.

UPDATE

In case of small data the broadcast hash join may occur, so set the following:

spark.conf.set("spark.sql.sources.bucketing.enabled", true)
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", -1)

Also, use spark.table I suggest, not spark.read.parquet... bucketBy ony works with table api. See https://engineering.taboola.com/bucket-the-shuffle-out-of-here/

0
votes

When we use bucketing or clustering while writing the data it divides the data save as multiple files. For Ex:

id,name,city
1,a,CA
2,b,NYC
3,c,NYC
4,d,CA

#So after bucketing based on city two file will be created 
id,name,city
1,a,CA
4,d,CA

and
id,name,city
2,b,NYC
3,c,NYC

SO when we read the file from new location we will be able to read the whole data.

Bucketing helps when you want to predicate pushdown some conditions as it will restrict spark to read only the specific files.

Hope it answers.