Steps:
1) Read file from S3 (140-200 GB).
2) Then I apply filter() function, to remove most map of the records (around 70% will be removed).
3) For remaining records (around 40-50 GB of total), use map() to create JavaRdd of MyObj.
4) After that, I use group by to group objects by sesisonId field. For the total of 40-50 GB, I expect to get around 300,000 groups in groupBy.
5) Then, for each group I do processSingeGroupOfEvents() (This function is as following: it takes Interable and then do some simple staff for each Iterable [out of total 300,000]).
6)After that, I use coalesce() and saveAsTextFile to output results to S3 (Output file size would be around 1-2 GB).
Pseudocode:
JavaRDD<MyObj> eventsCollection = context.textFile(input)
.filter(data -> applySomeFilter(data)) // This will filter ~70% of records
.map(data -> createMyObjFromData(data));
JavaPairRDD<String, Iterable<MyObj>> eventsCollectionGroupedById = eventsCollection
.groupBy(x -> x.getSessionId())
.persist(StorageLevel.MEMORY_AND_DISK());;
JavaPairRDD<String, String> groupedByIdResults = eventsCollectionGroupedById
.mapValues(iterable -> processSingeGroupOfEvents(iterable, additionalVariable1, additionalVariable2 ));
groupedByIdResults
.coalesce(1) // Union all partitions into single one (in order to avoid multiple output files)
.map(data -> data._2())
.saveAsTextFile(outputS3Location);
My current configurations for AWS EMR Spark app is:
- 4 of r3.8xlarge core instance type
- And 1 of r3.8xlarge for master node
- Emr release label: emr-5.11.0
- maximizeResourceAllocation=true.
- Spark version 1.4 (I cannot update Spark to newest version right now)
For now, it takes about 30-50 min to execute such job. However, in future, I expect that size of the input data will double in size (~300 GB of data => ~600,000 groups to aggregate on session id)
How can I estimate, that if my data doubles, my cluster will be able to handle such load? Also, sometimes I get error saying that quota limit in my region is exceeded for r3.8xlarge instance type, so I worry that if I add more hardware, I will have this issue more often.
UPDATE: processSingeGroupOfEvents() method is iterating over group of events(Iterable) with same session id, and performs some tricky calculations (for example, count running total, find max of some elemets in the group, parse timestamp etc.). It is returning comma separated string with aggregated values for particular session id.
.processSingeGroupOfEvents()does? - vdepDataFramebased api? All of the operations you mentioned can be expressed in terms of SQL statements with possible usage of custom logic with User Defined Functions. Are there any calculations which are trickier than the ones listed? If no I would expect a significant speedup resulting from execution plan optimization and code generation. RDD vs DataFrame performance comparison - Mateusz Piotrowski