I have a RDD (RDD[(String, Iterable[Event])] which has a key that represents a month in a year and the values are the millions of events that occurred during that month.
I want to loop through each key and create an RDD of the key's events. Then I want to create an event RDD for each day of the current month's events so that I can send them to the relevant s3 location (the "directory" structure is bucketName/year/month/day).
Problem is, it seems you cannot create RDD's inside the foreach of another RDD. So I'm not sure how to achieve what I want without having to load the entire main RDD into memory (which would certainly blow out the memory of the driver and defeat the point of using Spark in the first place).
Maybe there's a way to achieve what I want using Spark, I'm just not knowledgable with it to know and was hoping someone here could help.
Here is the code I have at the moment:
private def store(
eventsByMonth: RDD[(String, Iterable[Event])]
)(
implicit sqlContext: SQLContext
): Try[Unit] =
Try(
eventsByMonth
.foreach {
case (_, events: Iterable[Event]) =>
writeToS3Files(sqlContext.sparkContext.parallelize(events.toSeq))
}
)
private def writeToS3Files(events: RDD[Event])(
implicit sqlContext: SQLContext
): Try[Unit] =
Try(
// outputFilePath will contain the day that these events are related to.
events.groupBy(_.outputFilePath).foreach {
case (filePath: String, eventsForFile: Iterable[Event]) =>
writeToS3File(filePath, sqlContext.sparkContext.parallelize(eventsForFile.toSeq))
}
)
private def writeToS3File(filePath: String, events: RDD[Event]): Try[Unit] = {
val fileNameWithPath = s"${filePath}${UUID.randomUUID().toString}.gz"
Try(events.saveAsTextFile(fileNameWithPath, classOf[GzipCodec]))
}
RDD[Event]and not on anIterable[Event]? Cause I would try to go with anRDD[(S3Path, Iterable[Event])]and thenrdd.foreach(Function.tupled(writeToS3File)). If you can manage an RDD that's grouped on the month, then an RDD grouped on day should definitely work too. - Jasper-M