0
votes

Steps:

1) Read file from S3 (140-200 GB).

2) Then I apply filter() function, to remove most map of the records (around 70% will be removed).

3) For remaining records (around 40-50 GB of total), use map() to create JavaRdd of MyObj.

4) After that, I use group by to group objects by sesisonId field. For the total of 40-50 GB, I expect to get around 300,000 groups in groupBy.

5) Then, for each group I do processSingeGroupOfEvents() (This function is as following: it takes Interable and then do some simple staff for each Iterable [out of total 300,000]).

6)After that, I use coalesce() and saveAsTextFile to output results to S3 (Output file size would be around 1-2 GB).

Pseudocode:

 JavaRDD<MyObj> eventsCollection = context.textFile(input)
                .filter(data -> applySomeFilter(data))      // This will filter ~70% of records 
                .map(data ->  createMyObjFromData(data));

JavaPairRDD<String, Iterable<MyObj>> eventsCollectionGroupedById = eventsCollection
                .groupBy(x -> x.getSessionId())
                .persist(StorageLevel.MEMORY_AND_DISK());;

JavaPairRDD<String, String> groupedByIdResults = eventsCollectionGroupedById
                .mapValues(iterable -> processSingeGroupOfEvents(iterable, additionalVariable1, additionalVariable2 ));

groupedByIdResults
                .coalesce(1) // Union all partitions into single one (in order to avoid multiple output files)
                .map(data ->  data._2()) 
                .saveAsTextFile(outputS3Location);

My current configurations for AWS EMR Spark app is:

  • 4 of r3.8xlarge core instance type
  • And 1 of r3.8xlarge for master node
  • Emr release label: emr-5.11.0
  • maximizeResourceAllocation=true.
  • Spark version 1.4 (I cannot update Spark to newest version right now)

For now, it takes about 30-50 min to execute such job. However, in future, I expect that size of the input data will double in size (~300 GB of data => ~600,000 groups to aggregate on session id)

How can I estimate, that if my data doubles, my cluster will be able to handle such load? Also, sometimes I get error saying that quota limit in my region is exceeded for r3.8xlarge instance type, so I worry that if I add more hardware, I will have this issue more often.

UPDATE: processSingeGroupOfEvents() method is iterating over group of events(Iterable) with same session id, and performs some tricky calculations (for example, count running total, find max of some elemets in the group, parse timestamp etc.). It is returning comma separated string with aggregated values for particular session id.

1
Can you elaborate on what .processSingeGroupOfEvents() does? - vdep
@vdep Sure, updated question - WomenWhoCode
Did you consider using DataFrame based api? All of the operations you mentioned can be expressed in terms of SQL statements with possible usage of custom logic with User Defined Functions. Are there any calculations which are trickier than the ones listed? If no I would expect a significant speedup resulting from execution plan optimization and code generation. RDD vs DataFrame performance comparison - Mateusz Piotrowski

1 Answers

1
votes

Is processSingeGroupOfEvents reducing the amount of data? If yes, then replacing groupBy and mapValues by aggregateByKey can significantly reduce the amount of data to be shuffled.

After that, I'd suggest following the generic Spark tuning guide https://spark.apache.org/docs/latest/tuning.html . Check Spark Web UI for garbage collection times. EMR comes with Ganglia which can be used to monitor individual nodes in the cluster. Is CPU and memory used evenly between nodes?

Finally, you can execute the job with the current amount of data but halve the amount of nodes in the cluster. If the job finishes but takes around double the time, it's a good sign that the load is spread evenly and is likely to scale up. If it crashes or does not slow down significantly, there are some serious bottlenecks in the job.