In Apache Spark 2.4.5, how do you open up a set of parquet files that were written with bucketBy and saveAsTable?
For example:
case class VeryVeryDeeplyNestedThing(
s: String,
nested1: OtherVeryDeeplyNestedThing
)
case class OtherVeryDeeplyNestedThing (
youGetTheIdeaNoOneWantsToHandWriteASqlStatementForThese: NestedMcNesty
)
List(VeryVeryDeeplyNestedThing(...)).toDS()
.write
.bucketBy(512, "s")
.option("path", "/tmp/output")
.format("parquet")
.saveAsTable("mytable")
Now there's a set of parquet files in /tmp/output. Move the files in /tmp/output to /tmp/newPlace, and start a completely new spark session.
spark.read.parquet("/tmp/newPlace")
.whatGoesHere?
What do you need to do to read them back in with the same bucketing information that they were written with? It doesn't seem like that information is baked into the parquet files themselves, or is that what happens?
[Edit: added worked example partially from https://kb.databricks.com/_static/notebooks/data/bucketing-example.html per @thebluephantom I think showing that reading does in fact require something special]
If you create the parquet files like this:
scala> def base = spark.range(1, 160000, 1, 16).select($"id" as "key", rand(12) as "value")
base: org.apache.spark.sql.DataFrame
scala> import org.apache.spark.sql.SaveMode
import org.apache.spark.sql.SaveMode
scala> base.write.format("parquet").bucketBy(16, "key").sortBy("value").option("path", "/tmp/example").mode(SaveMode.Overwrite).saveAsTable("bucketed")
scala> base.write.format("parquet").option("path", "/tmp/exampleunbucketed").mode(SaveMode.Overwrite).saveAsTable("unbucketed")
scala> val t2 = spark.table("bucketed")
t2: org.apache.spark.sql.DataFrame = [key: bigint, value: double]
scala> val t3 = spark.table("bucketed")
t3: org.apache.spark.sql.DataFrame = [key: bigint, value: double]
// This is joining two bucketed tables
scala> t3.join(t2, Seq("key")).explain()
== Physical Plan ==
*(2) Project [key#51L, value#52, value#58]
+- *(2) BroadcastHashJoin [key#51L], [key#57L], Inner, BuildRight
:- *(2) Project [key#51L, value#52]
: +- *(2) Filter isnotnull(key#51L)
: +- *(2) FileScan parquet default.bucketed[key#51L,value#52] Batched: true, Format: Parquet, Location: InMemoryFileIndex[hdfs://ip-172-31-66-61.ec2.internal:50070/tmp/example], PartitionFilters: [], PushedFilters: [IsNotNull(key)], ReadSchema: struct<key:bigint,value:double>, SelectedBucketsCount: 16 out of 16
+- BroadcastExchange HashedRelationBroadcastMode(List(input[0, bigint, true]))
+- *(1) Project [key#57L, value#58]
+- *(1) Filter isnotnull(key#57L)
+- *(1) FileScan parquet default.bucketed[key#57L,value#58] Batched: true, Format: Parquet, Location: InMemoryFileIndex[hdfs://ip-172-31-66-61.ec2.internal:50070/tmp/example], PartitionFilters: [], PushedFilters: [IsNotNull(key)], ReadSchema: struct<key:bigint,value:double>, SelectedBucketsCount: 16 out of 16
That has a FileScan parquet default.bucketed on both sides. Now just do a plain read of the parquet files, and explain the join:
scala> val t4 = spark.read.parquet("/tmp/example")
t4: org.apache.spark.sql.DataFrame = [key: bigint, value: double]
scala> t3.join(t4, Seq("key")).explain()
== Physical Plan ==
*(2) Project [key#51L, value#52, value#64]
+- *(2) BroadcastHashJoin [key#51L], [key#63L], Inner, BuildRight
:- *(2) Project [key#51L, value#52]
: +- *(2) Filter isnotnull(key#51L)
: +- *(2) FileScan parquet default.bucketed[key#51L,value#52] Batched: true, Format: Parquet, Location: InMemoryFileIndex[hdfs://ip-172-31-66-61.ec2.internal:50070/tmp/example], PartitionFilters: [], PushedFilters: [IsNotNull(key)], ReadSchema: struct<key:bigint,value:double>, SelectedBucketsCount: 16 out of 16
+- BroadcastExchange HashedRelationBroadcastMode(List(input[0, bigint, true]))
+- *(1) Project [key#63L, value#64]
+- *(1) Filter isnotnull(key#63L)
+- *(1) FileScan parquet [key#63L,value#64] Batched: true, Format: Parquet, Location: InMemoryFileIndex[hdfs://ip-172-31-66-61.ec2.internal:50070/tmp/example], PartitionFilters: [], PushedFilters: [IsNotNull(key)], ReadSchema: struct<key:bigint,value:double>
t4 doesn't have anything indicating that it's bucketed.
Does this matter? Is it still bucketed? Am I misreading the explain output? Or do I have to do something to make sure t4 uses buckets?