I have a Cloudera cluster on which I am accumulating large amounts of data in a Hive table stored as Parquet. The table is partitioned by an integer batch_id. My workflow for inserting a new batch of rows is to first insert the rows into a staging table, then insert into the large accumulating table. I am using a local-mode Python Pyspark script to do this. The script is essentially:
sc = pyspark.SparkContext()
hc = pyspark.HiveContext(sc)
hc.sql(
"""
INSERT INTO largeAccumulatorTable
PARTITION (batch_id = {0})
SELECT * FROM stagingBatchId{0}
"""
.format(batch_id)
)
I execute it using this shell script:
#!/bin/bash
spark-submit \
--master local[*] \
--num-executors 8 \
--executor-cores 1 \
--executor-memory 2G \
spark_insert.py
I have noticed that the resulting Parquet files in the large accumulating table are very small (some just a few KB) and numerous. I want to avoid this. I want the Parquet files to be large and few. I've tried setting different Hive configuration values at runtime in Pyspark to no avail:
- Set
hive.input.format = org.apache.hadoop.hive.ql.io.CombineHiveInputFormat - Set
mapred.map.tasksto a small number - Set
num-executorsto a small number - Use
local[1]master instead oflocal[*] - Set
mapreduce.input.fileinputformat.split.minsizeandmapreduce.input.fileinputformat.split.maxsizeto high values
None of these changes had any effect on the number or sizes of Parquet files. However, when I open Cloudera Hue and enter this simple statement:
INSERT INTO largeAccumulatorTable
PARTITION (batch_id = XXX)
SELECT * FROM stagingBatchIdXXX
It works exactly as I would hope, producing a small number of Parquet files that are all about 100 MB.
What am I doing wrong in Pyspark? How can I make it achieve the same result as in Hue? Thanks!