0
votes

I have a dataframe that looks a bit like this:

| key 1 | key 2 | key 3 | body |

I want to save this dataframe in 1 json-file per partition, where a partition is a unique combination of keys 1 to 3. I have the following requirements:

  • The paths of the files should be /key 1/key 2/key 3.json.gz
  • The files should be compressed
  • The contents of the files should be values of body (this column contains a json string), one json-string per line.

I've tried multiple things, but no look.

Method 1: Using native dataframe.write I've tried using the native write method to save the data. Something like this:

df.write
  .partitionBy("key 1", "key 2", "key 3") \
  .mode('overwrite') \
  .format('json') \
  .option("codec", "org.apache.hadoop.io.compress.GzipCodec") \
  .save(
    path=path,
    compression="gzip"
  )

This solution doesn't store the files in the correct path and with the correct name, but this can be fixed by moving them afterwards. However, the biggest problem is that this is writing the complete dataframe, while I only want to write the values of the body column. But I need the other columns to partition the data.

Method 2: Using the Hadoop filesystem It's possible to directly call the Hadoop filesystem java library using this: sc._gateway.jvm.org.apache.hadoop.fs.FileSystem. With access to this filesystem it's possible to create files myself, giving me more control over the path, the filename and the contents. However, in order to make this code scale I'm doing this per partition, so:

df.foreachPartition(save_partition)

def save_partition(items):
  # Store the items of this partition here

However, I can't get this to work because the save_partition function is executed on the workers, which doesn't have access to the SparkSession and the SparkContext (which is needed to reach the Hadoop Filesystem JVM libraries). I could solve this by pulling all the data to the driver using collect() and save it from there, but that won't scale.

So, quite a story, but I prefer to be complete here. What am I missing? Is it impossible to do what I want, or am I missing something obvious? Or is it difficult? Or maybe it's only possible from Scala/Java? I would love to get some help on this.