4
votes

Background

I have 8k parquet files representing a table that I want to bucket by a particular column, creating a new set of 8k parquet files. I want to do this so that joins from other data sets on the bucketed column won't require re-shuffling. The documentation I'm working off of is here:

https://spark.apache.org/docs/latest/sql-data-sources-load-save-functions.html#bucketing-sorting-and-partitioning

Question

What's the easiest way to output parquet files that are bucketed? I want to do something like this:

df.write()
    .bucketBy(8000, "myBucketCol")
    .sortBy("myBucketCol")
    .format("parquet")
    .save("path/to/outputDir");

But according to the documentation linked above:

Bucketing and sorting are applicable only to persistent tables

I'm guessing I need to use saveAsTable as opposed to save. However saveAsTable doesn't take a path. Do I need to create a table prior to calling saveAsTable. Is it in that table creation statement that I declare where the parquet files should be written? If so, how do I do that?

2

2 Answers

0
votes
spark.sql("drop table if exists myTable");
spark.sql("create table myTable ("
    + "myBucketCol string, otherCol string ) "
    + "using parquet location '" + outputPath + "' "
    + "clustered by (myBucketCol) sorted by (myBucketCol) into 8000 buckets"
);
enlDf.write()
    .bucketBy(8000, "myBucketCol")
    .sortBy("myBucketCol")
    .format("parquet")
    .mode(SaveMode.Append)
    .saveAsTable("myTable");
0
votes

You can use the path option:

df.write()
    .bucketBy(8000, "myBucketCol")
    .sortBy("myBucketCol")
    .format("parquet")
    .option("path", "path/to/outputDir")
    .saveAsTable("whatever")