2
votes

Having a Hive table that's partitioned

CREATE EXTERNAL TABLE IF NOT EXISTS CUSTOMER_PART ( 
  NAME string ,
  AGE int ,
  YEAR INT)
  PARTITIONED BY (CUSTOMER_ID decimal(15,0))
  STORED AS PARQUET LOCATION 'HDFS LOCATION'

The first LOAD is done from ORACLE to HIVE via PYSPARK using

INSERT OVERWRITE TABLE CUSTOMER_PART PARTITION (CUSTOMER_ID) SELECT NAME, AGE, YEAR, CUSTOMER_ID FROM CUSTOMER;

Which works fine and creates partition dynamically during the run. Now coming to data loading incrementally everyday creates individual files for a single record under the partition.

INSERT INTO TABLE CUSTOMER_PART PARTITION (CUSTOMER_ID = 3) SELECT NAME, AGE, YEAR FROM CUSTOMER WHERE CUSTOMER_ID = 3; --Assume this gives me the latest record in the database

Is there a possibility to have the value appended to the existing parquet file under the partition until it reaches it block size, without having smaller files created for each insert.

Rewriting the whole partition is one option but I would prefer not to do this

INSERT OVERWRITE TABLE CUSTOMER_PART PARTITION (CUSTOMER_ID = 3) SELECT NAME, AGE, YEAR FROM CUSTOMER WHERE CUSTOMER_ID = 3;

The following properties are set for the Hive

set hive.execution.engine=tez; -- TEZ execution engine
set hive.merge.tezfiles=true; -- Notifying that merge step is required
set hive.merge.smallfiles.avgsize=128000000; --128MB
set hive.merge.size.per.task=128000000; -- 128MB

Which still doesn't help with daily inserts. Any alternate approach that can be followed would be really helpful.

2
"Is there a possibility to have the value appended to the existing parquet file" >> no. HDFS stores immutable files (with edge cases for appending to/truncating CSV files). And columnar formats such as Parquet store their data in a complex way with a "footer" that terminates the file (with edge cases for concatenating existing file fragments and rebuilding the footer). Search about "Hoodie" / "Hudi" from Uber engineering blog (they changed the name of the project at some point) to understand the problem and how complex it is to make incremental updates... - Samson Scharfrichter

2 Answers

1
votes

As Per my knowledge we cant store the single file for daily partition data since data will be stored by different part files for each day partition.

Since you mention that you are importing the data from Oracle DB so you can import the entire data each time from oracle DB and overwrite into HDFS. By this way you can maintain the single part file.

Also HDFS is not recommended for small amount data.

1
votes

I could think of the following approaches for this case:

Approach1:

Recreating the Hive Table, i.e after loading incremental data into CUSTOMER_PART table.

  • Create a temp_CUSTOMER_PART table with entire snapshot of CUSTOMER_PART table data.

  • Run overwrite the final table CUSTOMER_PART selecting from temp_CUSTOMER_PART table

  • In this case you are going to have final table without small files in it.

  • NOTE you need to make sure there is no new data is being inserted into CUSTOMER_PART table after temp table has been created.


Approach2:

Using input_file_name() function by making use of it:

  • check how many distinct filenames are there in each partition then select only the partitions that have more than 10..etc files in each partition.

  • Create an temporary table with these partitions and overwrite the final table only the selected partitions.

  • NOTE you need to make sure there is no new data is being inserted into CUSTOMER_PART table after temp table has been created because we are going to overwrite the final table.


Approach3:

Hive(not spark) offers overwriting and select same table .i.e

insert overwrite table default.t1 partition(partiton_column) 
select * from default.t1; //overwrite and select from same t1 table
  • If you are following this way then there needs to be hive job triggered once your spark job finishes.

  • Hive will acquire lock while running overwrite/select the same table so if any job which is writing to table will wait.

In Addition: Orc format will offer concatenate which will merge small ORC files to create a new larger file.

 alter table <db_name>.<orc_table_name> [partition_column="val"] concatenate;