11
votes

Is there any way to append a new column to an existing parquet file?

I'm currently working on a kaggle competition, and I've converted all the data to parquet files.

Here was the case, I read the parquet file into pyspark DataFrame, did some feature extraction and appended new columns to DataFrame with

pysaprk.DataFrame.withColumn().

After that, I want to save the new columns in the source parquet file.

I know Spark SQL come with Parquet schema evolution, but the example only have shown the case with a key-value.

The parquet "append" mode doesn't do the trick either. It only append new rows to the parquet file. If there's anyway to append a new column to an existing parquet file instead of generate the whole table again? Or I have to generate a separate new parquet file and join them on the runtime.

3
if you see architecturally , appending a new column to the existing parquet file can not be done..this is like playing around with the metadata of the parquet file.. - Aviral Kumar
Although you can try to rewrite it .. by first changing the schema.. I am not very sure though how this happens in spark-sql - Aviral Kumar
yeah, changing schema in spark-sql is easy, but overwriting the whole parquet file is costly which means I have to recompute the whole table again. Thanks for your comment, @AviralKumar - Chu-Yu Hsu

3 Answers

6
votes

In parquet you don't modify files, you read them, modify them and write them back, you cannot just change a column you need to read and write the full file.

5
votes

Although this question has been posted for 2 years, and still got no answer, let myself answer my own question.

For the time I still worked with Spark, the version of Spark was 1.4. I don't for new versions, but for that version, adding a new column to a parquet file was impossible.

0
votes

Yes, it possible with both Databricks Delta as well as with parquet tables. An example is given below:-

This Example wrote in python (pySpark)

df = sqlContext.createDataFrame([('1','Name_1','Address_1'),('2','Name_2','Address_2'),('3','Name_3','Address_3')], schema=['ID', 'Name', 'Address'])

delta_tblNm = 'testDeltaSchema.test_delta_tbl'
parquet_tblNm = 'testParquetSchema.test_parquet_tbl'

delta_write_loc = 'dbfs:///mnt/datalake/stg/delta_tblNm'
parquet_write_loc = 'dbfs:///mnt/datalake/stg/parquet_tblNm'


# DELTA TABLE
df.write.format('delta').mode('overwrite').option('overwriteSchema', 'true').save(delta_write_loc)
spark.sql(" create table if not exists {} using DELTA LOCATION '{}'".format(delta_tblNm, delta_write_loc))
spark.sql("refresh table {}".format(print(cur_tblNm)))

# PARQUET TABLE
df.write.format("parquet").mode("overwrite").save(parquet_write_loc)
spark.sql("""CREATE TABLE if not exists {} USING PARQUET LOCATION '{}'""".format(parquet_tblNm, parquet_write_loc))
spark.sql(""" REFRESH TABLE {} """.format(parquet_tblNm))

test_df = spark.sql("select * testDeltaSchema.test_delta_tbl")
test_df.show()

test_df = spark.sql("select * from testParquetSchema.test_parquet_tbl")
test_df.show()

test_df = spark.sql("ALTER TABLE  testDeltaSchema.test_delta_tbl ADD COLUMNS (Mob_number String COMMENT 'newCol' AFTER Address)")
test_df.show()

test_df = spark.sql("ALTER TABLE  testParquetSchema.test_parquet_tbl ADD COLUMNS (Mob_number String COMMENT 'newCol' AFTER Address)")
test_df.show()