0
votes

The goal: Millions of rows in Cassandra need to be extracted and compressed into a single file as quickly and efficiently as possible (on a daily basis).

The current setup uses a Google Dataproc cluster to run a Spark job that extracts the data directly into a Google Cloud Storage bucket. I've tried two approaches:

  1. Using the (now deprecated) FileUtil.copyMerge() to combine the roughly 9000 Spark partition files into a single uncompressed file, then submitting a Hadoop MapReduce job to compress that single file.

  2. Leaving the roughly 9000 Spark partition files as the raw output, and submitting a Hadoop MapReduce job to merge and compress those files into a single file.

Some job details: About 800 Million rows. About 9000 Spark partition files outputted by the Spark job. Spark job takes about an hour to complete running on a 1 Master, 4 Worker (4vCPU, 15GB each) Dataproc cluster. Default Dataproc Hadoop block size, which is, I think 128MB.

Some Spark configuration details:

spark.task.maxFailures=10
spark.executor.cores=4

spark.cassandra.input.consistency.level=LOCAL_ONE
spark.cassandra.input.reads_per_sec=100
spark.cassandra.input.fetch.size_in_rows=1000
spark.cassandra.input.split.size_in_mb=64

The Hadoop job:

hadoop jar file://usr/lib/hadoop-mapreduce/hadoop-streaming-2.8.4.jar
-Dmapred.reduce.tasks=1
-Dmapred.output.compress=true
-Dmapred.compress.map.output=true
-Dstream.map.output.field.separator=,
-Dmapred.textoutputformat.separator=,
-Dmapred.output.compression.codec=org.apache.hadoop.io.compress.GzipCodec
-input gs://bucket/with/either/single/uncompressed/csv/or/many/spark/partition/file/csvs
-output gs://output/bucket
-mapper /bin/cat
-reducer /bin/cat
-inputformat org.apache.hadoop.mapred.TextInputFormat
-outputformat org.apache.hadoop.mapred.TextOutputFormat
  1. The Spark job took about 1 hour to extract Cassandra data to GCS bucket. Using the FileUtil.copyMerge() added about 45 minutes to that, was performed by the Dataproc cluster but underutilized resources as it ones seems to use 1 node. The Hadoop job to compress that single file took an additional 50 minutes. This is not an optimal approach, as the cluster has to stay up longer even though it is not using its full resources.

The info output from that job:

INFO mapreduce.Job: Counters: 55
File System Counters
    FILE: Number of bytes read=5072098452
    FILE: Number of bytes written=7896333915
    FILE: Number of read operations=0
    FILE: Number of large read operations=0
    FILE: Number of write operations=0
    GS: Number of bytes read=47132294405
    GS: Number of bytes written=2641672054
    GS: Number of read operations=0
    GS: Number of large read operations=0
    GS: Number of write operations=0
    HDFS: Number of bytes read=57024
    HDFS: Number of bytes written=0
    HDFS: Number of read operations=352
    HDFS: Number of large read operations=0
    HDFS: Number of write operations=0
Job Counters 
    Killed map tasks=1
    Launched map tasks=353
    Launched reduce tasks=1
    Rack-local map tasks=353
    Total time spent by all maps in occupied slots (ms)=18495825
    Total time spent by all reduces in occupied slots (ms)=7412208
    Total time spent by all map tasks (ms)=6165275
    Total time spent by all reduce tasks (ms)=2470736
    Total vcore-milliseconds taken by all map tasks=6165275
    Total vcore-milliseconds taken by all reduce tasks=2470736
    Total megabyte-milliseconds taken by all map tasks=18939724800
    Total megabyte-milliseconds taken by all reduce tasks=7590100992
Map-Reduce Framework
    Map input records=775533855
    Map output records=775533855
    Map output bytes=47130856709
    Map output materialized bytes=2765069653
    Input split bytes=57024
    Combine input records=0
    Combine output records=0
    Reduce input groups=2539721
    Reduce shuffle bytes=2765069653
    Reduce input records=775533855
    Reduce output records=775533855
    Spilled Records=2204752220
    Shuffled Maps =352
    Failed Shuffles=0
    Merged Map outputs=352
    GC time elapsed (ms)=87201
    CPU time spent (ms)=7599340
    Physical memory (bytes) snapshot=204676702208
    Virtual memory (bytes) snapshot=1552881852416
    Total committed heap usage (bytes)=193017675776
Shuffle Errors
    BAD_ID=0
    CONNECTION=0
    IO_ERROR=0
    WRONG_LENGTH=0
    WRONG_MAP=0
    WRONG_REDUCE=0
File Input Format Counters 
    Bytes Read=47132294405
File Output Format Counters 
    Bytes Written=2641672054 
  1. I expected this to perform as well as or better than the other approach, but it performed much worse. The Spark job remained unchanged. Skipping the FileUtil.copyMerge() and jumping straight into the Hadoop MapReduce job... the map portion of the job was only at about 50% after an hour and a half. Job was cancelled at that point, as it was clear it was not going to be viable.

I have complete control over the Spark job and the Hadoop job. I know we could create a bigger cluster, but I'd rather do that only after making sure the job itself is optimized. Any help is appreciated. Thanks.

1

1 Answers

1
votes

Can you provide some more details of your Spark job? What API of Spark are you using - RDD or Dataframe? Why not perform merge phase completely in Spark (with repartition().write()) and avoid chaining of Spark and MR jobs?