The goal: Millions of rows in Cassandra need to be extracted and compressed into a single file as quickly and efficiently as possible (on a daily basis).
The current setup uses a Google Dataproc cluster to run a Spark job that extracts the data directly into a Google Cloud Storage bucket. I've tried two approaches:
Using the (now deprecated) FileUtil.copyMerge() to combine the roughly 9000 Spark partition files into a single uncompressed file, then submitting a Hadoop MapReduce job to compress that single file.
Leaving the roughly 9000 Spark partition files as the raw output, and submitting a Hadoop MapReduce job to merge and compress those files into a single file.
Some job details: About 800 Million rows. About 9000 Spark partition files outputted by the Spark job. Spark job takes about an hour to complete running on a 1 Master, 4 Worker (4vCPU, 15GB each) Dataproc cluster. Default Dataproc Hadoop block size, which is, I think 128MB.
Some Spark configuration details:
spark.task.maxFailures=10
spark.executor.cores=4
spark.cassandra.input.consistency.level=LOCAL_ONE
spark.cassandra.input.reads_per_sec=100
spark.cassandra.input.fetch.size_in_rows=1000
spark.cassandra.input.split.size_in_mb=64
The Hadoop job:
hadoop jar file://usr/lib/hadoop-mapreduce/hadoop-streaming-2.8.4.jar
-Dmapred.reduce.tasks=1
-Dmapred.output.compress=true
-Dmapred.compress.map.output=true
-Dstream.map.output.field.separator=,
-Dmapred.textoutputformat.separator=,
-Dmapred.output.compression.codec=org.apache.hadoop.io.compress.GzipCodec
-input gs://bucket/with/either/single/uncompressed/csv/or/many/spark/partition/file/csvs
-output gs://output/bucket
-mapper /bin/cat
-reducer /bin/cat
-inputformat org.apache.hadoop.mapred.TextInputFormat
-outputformat org.apache.hadoop.mapred.TextOutputFormat
- The Spark job took about 1 hour to extract Cassandra data to GCS bucket. Using the FileUtil.copyMerge() added about 45 minutes to that, was performed by the Dataproc cluster but underutilized resources as it ones seems to use 1 node. The Hadoop job to compress that single file took an additional 50 minutes. This is not an optimal approach, as the cluster has to stay up longer even though it is not using its full resources.
The info output from that job:
INFO mapreduce.Job: Counters: 55
File System Counters
FILE: Number of bytes read=5072098452
FILE: Number of bytes written=7896333915
FILE: Number of read operations=0
FILE: Number of large read operations=0
FILE: Number of write operations=0
GS: Number of bytes read=47132294405
GS: Number of bytes written=2641672054
GS: Number of read operations=0
GS: Number of large read operations=0
GS: Number of write operations=0
HDFS: Number of bytes read=57024
HDFS: Number of bytes written=0
HDFS: Number of read operations=352
HDFS: Number of large read operations=0
HDFS: Number of write operations=0
Job Counters
Killed map tasks=1
Launched map tasks=353
Launched reduce tasks=1
Rack-local map tasks=353
Total time spent by all maps in occupied slots (ms)=18495825
Total time spent by all reduces in occupied slots (ms)=7412208
Total time spent by all map tasks (ms)=6165275
Total time spent by all reduce tasks (ms)=2470736
Total vcore-milliseconds taken by all map tasks=6165275
Total vcore-milliseconds taken by all reduce tasks=2470736
Total megabyte-milliseconds taken by all map tasks=18939724800
Total megabyte-milliseconds taken by all reduce tasks=7590100992
Map-Reduce Framework
Map input records=775533855
Map output records=775533855
Map output bytes=47130856709
Map output materialized bytes=2765069653
Input split bytes=57024
Combine input records=0
Combine output records=0
Reduce input groups=2539721
Reduce shuffle bytes=2765069653
Reduce input records=775533855
Reduce output records=775533855
Spilled Records=2204752220
Shuffled Maps =352
Failed Shuffles=0
Merged Map outputs=352
GC time elapsed (ms)=87201
CPU time spent (ms)=7599340
Physical memory (bytes) snapshot=204676702208
Virtual memory (bytes) snapshot=1552881852416
Total committed heap usage (bytes)=193017675776
Shuffle Errors
BAD_ID=0
CONNECTION=0
IO_ERROR=0
WRONG_LENGTH=0
WRONG_MAP=0
WRONG_REDUCE=0
File Input Format Counters
Bytes Read=47132294405
File Output Format Counters
Bytes Written=2641672054
- I expected this to perform as well as or better than the other approach, but it performed much worse. The Spark job remained unchanged. Skipping the FileUtil.copyMerge() and jumping straight into the Hadoop MapReduce job... the map portion of the job was only at about 50% after an hour and a half. Job was cancelled at that point, as it was clear it was not going to be viable.
I have complete control over the Spark job and the Hadoop job. I know we could create a bigger cluster, but I'd rather do that only after making sure the job itself is optimized. Any help is appreciated. Thanks.