1
votes

I am working on something that is going to automatically register table and partition to hive metastore after writing to S3.

Before I can register all the partitions, I need to know all the partition values. Now I am doing ds.select(partitionColumn).distinct().collectAsList(); to get all the partition values.

Is there a better way to get partition value from my dataset?

1
AWS Glue already do this for you. - Thiago Baldim
I'm not aware of a better solution, that's also the way I do it - Raphael Roth
@ThiagoBaldim we have looked AWS Glue, but it doesn't seems to allow us to use it as metastore service for external product. Like Tableau, Databricks, etc... - Yang Jian
@RaphaelRoth yep, it works. But if the dataset is big, then it will take a while to do it. I was wondering since I call ds.write.partitionBy.save first, it already write the data to all partitions. Did figure out a way to get that though. - Yang Jian
Ow that is true, this can allow you to work easily with your EMR. But if you need to take information from that you can try to build something on top of Boto. - Thiago Baldim

1 Answers

1
votes

After reading the Spark source code, specially AlterTableRecoverPartitionsCommand in org.apache.spark.sql.execution.command.ddl.scala, which is the Spark implementation of ALTER TABLE RECOVER PARTITIONS. It's scan all the partitions, then register them.

So, here is the same idea, scan all the partitions from the location that we just wrote to.

Get the key names from it, then extract partition name/value from it.

Here is the code snippet to get the path.

String location = "s3n://somebucket/somefolder/dateid=20171010/";
Path root = new Path(location);

Configuration hadoopConf = sparkSession.sessionState().newHadoopConf();
FileSystem fs = root.getFileSystem(hadoopConf);

JobConf jobConf = new JobConf(hadoopConf, this.getClass());
final PathFilter pathFilter = FileInputFormat.getInputPathFilter(jobConf);

FileStatus[] fileStatuses = fs.listStatus(root, path -> {
    String name = path.getName();
    if (name != "_SUCCESS" && name != "_temporary" && !name.startsWith(".")) {
        return pathFilter == null || pathFilter.accept(path);
    } else {
        return false;
    }
});

for(FileStatus fileStatus: fileStatuses) {
    System.out.println(fileStatus.getPath().getName());
}