0
votes

I have a Cloudera cluster on which I am accumulating large amounts of data in a Hive table stored as Parquet. The table is partitioned by an integer batch_id. My workflow for inserting a new batch of rows is to first insert the rows into a staging table, then insert into the large accumulating table. I am using a local-mode Python Pyspark script to do this. The script is essentially:

sc = pyspark.SparkContext()
hc = pyspark.HiveContext(sc)
hc.sql(
    """
    INSERT INTO largeAccumulatorTable
    PARTITION (batch_id = {0})
    SELECT * FROM stagingBatchId{0}
    """
    .format(batch_id)
)

I execute it using this shell script:

#!/bin/bash
spark-submit \
    --master local[*] \
    --num-executors 8 \
    --executor-cores 1 \
    --executor-memory 2G \
    spark_insert.py

I have noticed that the resulting Parquet files in the large accumulating table are very small (some just a few KB) and numerous. I want to avoid this. I want the Parquet files to be large and few. I've tried setting different Hive configuration values at runtime in Pyspark to no avail:

  • Set hive.input.format = org.apache.hadoop.hive.ql.io.CombineHiveInputFormat
  • Set mapred.map.tasks to a small number
  • Set num-executors to a small number
  • Use local[1] master instead of local[*]
  • Set mapreduce.input.fileinputformat.split.minsize and mapreduce.input.fileinputformat.split.maxsize to high values

None of these changes had any effect on the number or sizes of Parquet files. However, when I open Cloudera Hue and enter this simple statement:

INSERT INTO largeAccumulatorTable
PARTITION (batch_id = XXX)
SELECT * FROM stagingBatchIdXXX

It works exactly as I would hope, producing a small number of Parquet files that are all about 100 MB.

What am I doing wrong in Pyspark? How can I make it achieve the same result as in Hue? Thanks!

1

1 Answers

0
votes

spark default shuffle partitions are 200. Based on data size try reducing or increasing the configuration value. sqlContext.sql("set spark.sql.shuffle.partitions=20");