1
votes

From the log I can see that there are 182k rows 70MB. It take 1.5 hours load 70MB of data and 9 hours (Started at 15/11/14 01:58:28 and ended at 15/11/14 09:19:09) to train 182K rows on Dataproc. Loading the same data and running the same algorithm on my local machine takes 3 mins

DataProc Log

15/11/13 23:27:09 INFO com.google.cloud.hadoop.io.bigquery.ShardedExportToCloudStorage: Table 'mydata-data:website_wtw_feed.video_click20151111' to be exported has 182712 rows and 70281790 bytes
15/11/13 23:28:13 WARN akka.remote.ReliableDeliverySupervisor: Association with remote system [akka.tcp://[email protected]:60749] has failed, address is now gated for [5000] ms. Reason: [Disassociated] 

15/11/14 01:58:28 INFO com.dailymotion.recommender.BigQueryRecommender: Fetching the Ratings RDD
15/11/14 01:58:28 INFO com.dailymotion.recommender.BigQueryRecommender: Transforming the video feature matrix
15/11/14 01:58:28 INFO com.dailymotion.recommender.BigQueryRecommender: Training ALS Matrix factorization Model


[Stage 2:=============================>                             (1 + 1) / 2]

15/11/14 09:19:09 WARN com.github.fommil.netlib.BLAS: Failed to load implementation from: com.github.fommil.netlib.NativeSystemBLAS
15/11/14 09:19:09 WARN com.github.fommil.netlib.BLAS: Failed to load implementation from: com.github.fommil.netlib.NativeRefBLAS

15/11/14 09:19:44 INFO com.dailymotion.recommender.BigQueryRecommender: Transforming the video feature matrix
15/11/14 09:19:44 INFO com.dailymotion.recommender.BigQueryRecommender: Transforming the user feature matrix
  1. Copied the data to local machine

    r.viswanadha$ gsutil cp -r gs://<mycompany>-spark-rc-test/bqdata/hadoop/tmp/bigquery/job_201511132327_0000 .
    
    
    Copying gs://dailymotion-spark-rc-test/bqdata/hadoop/tmp/bigquery/job_201511132327_0000/shard-0/data-000000000000.json... 
    
    Downloading ...201511132327_0000/shard-0/data-000000000000.json: 141.3 MiB/141.3 MiB      
    
    Copying gs://dailymotion-spark-rc-test/bqdata/hadoop/tmp/bigquery/job_201511132327_0000/shard-0/data-000000000001.json... 
    
    Copying gs://<mycompany>-spark-rc-test/bqdata/hadoop/tmp/bigquery/job_201511132327_0000/shard-1/data-000000000000.json...`
    
  2. Ran the same Algorithm. ALS Train step took ~3 mins

    com.dailymotion.recommender.BigQueryRecommender --app_name BigQueryRecommenderTest --master local[4] --input_dir /Users/r.viswanadha/Documents/workspace/rec-spark-java-poc/input/job_201511132327_0000/shard-0/ 
    

First Run

15/11/14 13:19:36 INFO BigQueryRecommender: Training implicit features for the ALS Matrix factorization Model
...
15/11/14 13:22:24 INFO BigQueryRecommender: Transforming the video feature matrix

Second Run

15/11/14 13:29:05 INFO BigQueryRecommender: Training implicit features for the ALS Matrix factorization Model


...

15/11/14 13:31:57 INFO BigQueryRecommender: Transforming the video feature matrix

The DataProc cluster has 1 Master and 3 Slaves with 104GB (RAM) and 16 CPUs each.

My local machine has 8GB (RAM) and 2 CPU 2.7GHz Core i5

gsutil ls -l -r -h  gs://dailymotion-spark-rc-test/bqdata/hadoop/tmp/bigquery/job_201511132327_0000

gs://dailymotion-spark-rc-test/bqdata/hadoop/tmp/bigquery/job_201511132327_0000/: 

gs://dailymotion-spark-rc-test/bqdata/hadoop/tmp/bigquery/job_201511132327_0000/shard-0/: 

    0 B  2015-11-13T23:27:13Z  gs://dailymotion-spark-rc-test/bqdata/hadoop/tmp/bigquery/job_201511132327_0000/shard-0/ 

    141.3 MiB  2015-11-13T23:29:21Z  gs://dailymotion-spark-rc-test/bqdata/hadoop/tmp/bigquery/job_201511132327_0000/shard-0/data-000000000000.json 

   0 B  2015-11-13T23:29:21Z  gs://dailymotion-spark-rc-test/bqdata/hadoop/tmp/bigquery/job_201511132327_0000/shard-0/data-000000000001.json 

gs://dailymotion-spark-rc-test/bqdata/hadoop/tmp/bigquery/job_201511132327_0000/shard-1/: 

    0 B  2015-11-13T23:27:13Z  gs://dailymotion-spark-rc-test/bqdata/hadoop/tmp/bigquery/job_201511132327_0000/shard-1/ 

    0 B  2015-11-13T23:28:47Z  gs://dailymotion-spark-rc-test/bqdata/hadoop/tmp/bigquery/job_201511132327_0000/shard-1/data-000000000000.json 

   0 B  2015-11-13T23:27:09Z  gs://dailymotion-spark-rc-test/bqdata/hadoop/tmp/bigquery/job_201511132327_0000/ 

TOTAL: 6 objects, 148165416 bytes (141.3 MiB)
2
Hi @Ram - What was the size of your Dataproc cluster (nodes and machine types used?) - James
Also, could you check how much was slowness in the BigQuery export vs how much was the actual processing? Especially if you still have that BigQuery export data available in gs://<mycompany>-spark-rc-test/bqdata/hadoop/tmp/bigquery/job_201511132327_0000 then you can try using that as the file path on the Dataproc job without using the BigQueryInputFormat. - Dennis Huo
Also, some info that could help would be if you run gsutil ls -l -r gs://dailymotion-spark-rc-test/bqdata/hadoop/tmp/bigquery/job_201511132327_0000/ and report the sizes and timestamps. If you prefer not to share that information here, please send it to [email protected] to only provide that information to Google engineers. - Dennis Huo
1. Size of cluster: 1 Master, 3 Slaves. Each machine has 104 GB (RAM), 16 CPUs. 2. newHadoopAPI completes in ~1.5hrs, ALS Train takes about 7.5 hrs - Ram
$ gsutil ls -l -r -h gs://dailymotion-spark-rc-test/bqdata/hadoop/tmp/bigquery/job_201511132327_0000 gs://dailymotion-spark-rc-test/bqdata/hadoop/tmp/bigquery/job_201511132327_0000/: gs://dailymotion-spark-rc-test/bqdata/hadoop/tmp/bigquery/job_201511132327_0000/shard-0/: 141.3 MiB 2015-11-13T23:29:21Z gs://dailymotion-spark-rc-test/bqdata/hadoop/tmp/bigquery/job_201511132327_0000/shard-0/data-000000000000.json ... gs://dailymotion-spark-rc-test/bqdata/hadoop/tmp/bigquery/job_201511132327_0000/shard-1/: ... TOTAL: 6 objects, 148165416 bytes (141.3 MiB) - Ram

2 Answers

3
votes

To recap some of the offline findings, when things are running orders of magnitude slower on the distributed cluster compared to a local setup, the main bottlenecks to look for are I/O round-trip-latency bottlenecks both in terms of cross-network service dependencies as well as disk and local I/O.

Things to look for in general (some of which may or may not be applicable to your specific case, but may be common to others encountering similar issues):

  1. Make sure the GCS bucket holding the data is in the same region as the GCE zone you've deployed your Dataproc cluster with gsutil ls -L gs://[your-bucket]. Cross continental traffic is not only significantly slower, but also may incur additional network costs in your project.
  2. If your job has any other network dependencies such as querying APIs or a separate database of some sort running on GCE, try to colocate them in the same zone; even within the same continent, GCE cross-region traffic could have tens of milliseconds of round-trip latency, which can add up significantly especially if there are per-record requests being made (for example, 30ms * 180k records comes out to 1.5 hours).
  3. Even though this may not have been applicable to your specific case this time, remember to avoid per-record round-trip I/O to GCS via Hadoop FileSystem interfaces if possible; overall throughput to GCS is very scalable, but by nature of remote storage, round-trip latencies are much slower than round-trip latencies you might measure on a local machine, due to local reads often hitting the OS buffer cache or if you're using a laptop with SSD being able to sustain high volumes of sub-millisecond round trips, compared to 30ms-100ms round-trips to GCS.

And in general, for use cases that can support very high throughput but suffer long round-trip latencies, make sure to shard out the data with something like repartition() if the data is small and doesn't naturally partition into sufficient parallelism already to ensure good utilization of your Spark cluster.

Finally, our latest Dataproc release fixes a bunch of native library configurations, so it may show much better performance for the ALS portion as well as other mllib use cases.

2
votes

For anyone who hits something similar: when dealing with only a single small object in GCS (or a single shard with data from the BigQuery connector), you can end up with a single partition in your Spark RDD and as a result, end up with little or no parallelism.

While it results in an extra shuffle phase, the input RDD can be repartitioned right after reading from GCS or BigQuery to acquire the desired number of partitions. Whether the extra shuffle is beneficial is dependant on how much processing or IO is required for each record in an RDD.