6
votes

I'm pretty new to Spark (2 days) and I'm pondering the best way to partition parquet files.

My rough plan ATM is:

  • read in the source TSV files with com.databricks.spark.csv (these have a TimeStampType column)
  • write out parquet files, partitioned by year/month/day/hour
  • use these parquet files for all the queries that'll then be occurring in future

It's been ludicrously easy (kudos to Spark devs) to get a simple version working - except for partitioning the way I'd like to. This is in python BTW:

input = sqlContext.read.format('com.databricks.spark.csv').load(source, schema=myschema)
input.write.partitionBy('type').format("parquet").save(dest, mode="append")

Is the best approach to map the RDD, adding new columns for year, month, day, hour and then use PartitionBy? Then for any queries we have to manually add year/month etc? Given how elegant I've found spark to be so far, this seems a little odd.

Thanks

1

1 Answers

4
votes

I've found a few ways to do this now, not yet run performance tests over them, caveat emptor:

First we need to create a derived DataFrame (three ways shown below) and then write it out.

1) sql queries (inline functions)

sqlContext.registerFunction("day",lambda f: f.day, IntegerType())
input.registerTempTable("input")
input_ts = sqlContext.sql(
  "select day(inserted_at) AS inserted_at_day, * from input")

2) sql queries (non-inline) - very similar

def day(ts):
  return f.day
sqlContext.registerFunction("day",day, IntegerType())
... rest as before

3) withColumn

from pyspark.sql.functions import udf
day = udf(lambda f: f.day, IntegerType())
input_ts = input.withColumn('inserted_at_day',day(input.inserted_at))

To write out just:

input_ts.write.partitionBy(['inserted_at_day']).format("parquet").save(dest, mode="append")