0
votes

I have a dataframe record updated everytime a process runs, that means i will have a dataframe of one row and 4 columns every time the process completes. Then I will insert it into the hive table using dataframe write and in parquet format. Because of one record at a time, I'm seeing so many small files in the table folder in hfds.

Could you please let me know how to reduce and write it to same file (parquet file) when I'm writing the data to the hive table??

hdfs location: user_id/employe_db/market_table/
from:
part-04498-f33fc4b5-47d9-4d14-b37e-8f670cb2c53c-c000.snappy.parquet
part-04497-f33fc4b5-47d9-4d14-b37e-8f670cb2c53c-c000.snappy.parquet
part-04496-f33fc4b5-47d9-4d14-b37e-8f670cb2c53c-c000.snappy.parquet
part-04496-f33fc4b5-47d9-4d14-b37e-8f670cb2c53c-c000.snappy.parquet
part-04450-f33fc4b5-47d9-4d14-b37e-8f670cb2c53c-c000.snappy.parquet
part-04449-f33fc4b5-47d9-4d14-b37e-8f670cb2c53c-c000.snappy.parquet

to:
part-03049-f33fc4b5-47d9-4d14-b37e-8f670cb2c53c-c000.snappy.parquet

How to reduce the number of parquet files to fixed no# of less files and load/write the new data into the existing files?? part-04499-f33fc4b5-47d9-4d14-b37e-8f670cb2c53c-c000.snappy.parquet

1

1 Answers

0
votes

Before writing to HDFS you can repartition(1) so that you will create 1 file per execution.

df.repartition(1).write.parquet("<directory>")

Merging files:

Using Hive:

If you already have hive table on top of user_id/employe_db/market_table/ directory then run insert overwrite by selecting same table.

spark.sql("insert overwrite table <db>.<tab_market> select * from <db>.<tab_market>")

--to create only one file then use order by

spark.sql("insert overwrite table <db>.<tab_market> select * from <db>.<tab_market> order by <column>")

You can run insert statements as is in Hive also.

(or)

Using Spark:

As a post ingestion process you can again read the parquet files from the directory then do repartition again and write to the directory.

df_src=spark.read.parquet("<directory>")
df_src.repartition(<number>).write.mode("overwrite").parquet("<directory>")

NOTE

  • overwrite first deletes the directory, in case if job failed in between we may encounter data loss.
  • Its best practice to take a backup of data into tmp directory then only overwrite