16
votes

I have a large (about 85 GB compressed) gzipped file from s3 that I am trying to process with Spark on AWS EMR (right now with an m4.xlarge master instance and two m4.10xlarge core instances each with a 100 GB EBS volume). I am aware that gzip is a non-splittable file format, and I've seen it suggested that one should repartition the compressed file because Spark initially gives an RDD with one partition. However, after doing

scala> val raw = spark.read.format("com.databricks.spark.csv").
     | options(Map("delimiter" -> "\\t", "codec" -> "org.apache.hadoop.io.compress.GzipCodec")).
     | load("s3://path/to/file.gz").
     | repartition(sc.defaultParallelism * 3)
raw: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [_c0: string, _c1: string ... 48 more fields
scala> raw.count()

and taking a look at the Spark application UI, I still see only one active executor (the other 14 are dead) with one task, and the job never finishes (or at least I've not waited long enough for it to).

  • What is going on here? Can someone help me understand how Spark is working in this example?
  • Should I be using a different cluster configuration?
  • Unfortunately, I have no control over the mode of compression, but is there an alternative way of dealing with such a file?
4

4 Answers

11
votes

If the file format is not splittable, then there's no way to avoid reading the file in its entirety on one core. In order to parallelize work, you have to know how to assign chunks of work to different computers. In the gzip case, suppose you divide it up into 128M chunks. The nth chunk depends on the n-1-th chunk's position information to know how to decompress, which depends on the n-2-nd chunk, and so on down to the first.

If you want to parallelize, you need to make this file splittable. One way is to unzip it and process it uncompressed, or you can unzip it, split it into several files (one file for each parallel task you want), and gzip each file.

2
votes

Spark can parallelize reading a single gzip file.

The best you can do split it in chunks that are gzipped.

However, Spark is really slow at reading gzip files. You can do this to speed it up:

file_names_rdd = sc.parallelize(list_of_files, 100)
lines_rdd = file_names_rdd.flatMap(lambda _: gzip.open(_).readlines())

Going through Python is twice has fast as reading the native Spark gzip reader.

0
votes

I have faced this problem and here is the solution.

Best way to approach this problem is to unzip the .gz file before our Spark batch run. Then use this unzip file, after that we can use Spark parallelism.

Code to unzip the .gz file.

import gzip
import shutil
with open('file.txt.gz', 'rb') as f_in, gzip.open('file.txt', 'wb') as f_out:
    shutil.copyfileobj(f_in, f_out)
0
votes

The solution I've used is the de-compression codec: SplittableGZipCodec by Niels Basjes. This codec will feed the same file to multiple spark tasks. Each task will 'fast forward' or seek to a specific offset in the gzip file and then begin decompressing from there. It runs multiple tasks on the same gzip file, significantly decreasing the wall clock time, increasing the chances the gunzip is successful at the small cost of increasing the total core hours used. Brilliant. I've tested it on 20-50GB files.

The spark solution is described here in detail: https://github.com/nielsbasjes/splittablegzip/blob/master/README-Spark.md

# splittable-gzip.py
from pyspark.sql import SparkSession


if __name__ == '__main__':
    spark = (
        SparkSession.builder
        # If you want to change the split size, you need to use this config
        # instead of mapreduce.input.fileinputformat.split.maxsize.
        # I don't think Spark DataFrames offer an equivalent setting for
        # mapreduce.input.fileinputformat.split.minsize.
        .config('spark.sql.files.maxPartitionBytes', 1000 * (1024 ** 2))
        .getOrCreate()
    )

    print(
        spark.read
        # You can also specify this option against the SparkSession.
        .option('io.compression.codecs', 'nl.basjes.hadoop.io.compress.SplittableGzipCodec')
        .csv(...)
        .count()
    )