I'm trying to read files from multiple s3 buckets.
Originally the buckets would be in different regions, but it looks like that isn't possible.
So now I've copied the other bucket to the same region as the first bucket to read from, which is the same region I'm executing the spark job from.
SparkSession set up:
val sparkConf = new SparkConf()
.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
.registerKryoClasses(Array(classOf[Event]))
SparkSession.builder
.appName("Merge application")
.config(sparkConf)
.getOrCreate()
Function that gets called using the SQLContext from the create SparkSession:
private def parseEvents(bucketPath: String, service: String)(
implicit sqlContext: SQLContext
): Try[RDD[Event]] =
Try(
sqlContext.read
.option("codec", "org.apache.hadoop.io.compress.GzipCodec")
.json(bucketPath)
.toJSON
.rdd
.map(buildEvent(_, bucketPath, service).get)
)
Main flow:
for {
bucketOnePath <- buildBucketPath(config.bucketOne.name)
_ <- log(s"Reading events from $bucketOnePath")
bucketOneEvents: RDD[Event] <- parseEvents(bucketOnePath, config.service)
_ <- log(s"Enriching events from $bucketOnePath with originating region data")
bucketOneEventsWithRegion: RDD[Event] <- enrichEventsWithRegion(
bucketOneEvents,
config.bucketOne.region
)
bucketTwoPath <- buildBucketPath(config.bucketTwo.name)
_ <- log(s"Reading events from $bucketTwoPath")
bucketTwoEvents: RDD[Event] <- parseEvents(config.bucketTwo.name, config.service)
_ <- log(s"Enriching events from $bucketTwoPath with originating region data")
bucketTwoEventsWithRegion: RDD[Event] <- enrichEventsWithRegion(
bucketTwoEvents,
config.bucketTwo.region
)
_ <- log("Merging events")
mergedEvents: RDD[Event] <- merge(bucketOneEventsWithRegion, bucketTwoEventsWithRegion)
if mergedEvents.isEmpty() == false
_ <- log("Grouping merged events by partition key")
mergedEventsByPartitionKey: RDD[(EventsPartitionKey, Iterable[Event])] <- eventsByPartitionKey(
mergedEvents
)
_ <- log(s"Storing merged events to ${config.outputBucket.name}")
_ <- store(config.outputBucket.name, config.service, mergedEventsByPartitionKey)
} yield ()
The error I get in the logs (actual bucket names have been changed but the real names do exist):
19/04/09 13:10:20 INFO SparkContext: Created broadcast 4 from rdd at MergeApp.scala:141
19/04/09 13:10:21 INFO FileSourceScanExec: Planning scan with bin packing, max size: 134217728 bytes, open cost is considered as scanning 4194304 bytes.
org.apache.spark.sql.AnalysisException: Path does not exist: hdfs:someBucket2
And my stdout logs show how far the main code goes before failing:
Reading events from s3://someBucket/*/*/*/*/*.gz
Enriching events from s3://someBucket/*/*/*/*/*.gz with originating region data
Reading events from s3://someBucket2/*/*/*/*/*.gz
Merge failed: Path does not exist: hdfs://someBucket2
Strangely, the first read always works no matter which bucket I choose. But the second read always fails, no matter the bucket. This tells me there's nothing wrong with the buckets, but some spark oddity when working with multiple s3 buckets.
I can only see threads on reading multiple files from a single s3 bucket, not multiple files from multiple s3 buckets.
Any ideas?