I am using AWS S3 as a backup storage for data coming in to our Spark cluster. Data comes in every second and is processed when 10 seconds of data has been read. The RDD containing the 10 seconds of data is stored to S3 using
rdd.saveAsObjectFile(s3URL + dateFormat.format(new Date()));
This means that we get a lot of files added to S3 each day in the format of
S3URL/2017/07/23/12/00/10, S3URL/2017/07/23/12/00/20 etc
From here it is easy to restore the RDD which is a
JavaRDD<'byte[]>
using either
sc.objectFile or the AmazonS3 API
The problem is that to reduce the number of files needed to iterate through we run a daily cron job that goes through each file during a day, bunch the data together and store the new RDD to S3. This is done as follows:
List<byte[]> dataList = new ArrayList<>(); // A list of all read messages
/* Get all messages from S3 and store them in the above list */
try {
final ListObjectsV2Request req = new ListObjectsV2Request().withBucketName("bucketname").withPrefix("logs/" + dateString);
ListObjectsV2Result result;
do {
result = s3Client.listObjectsV2(req);
for (S3ObjectSummary objectSummary :
result.getObjectSummaries()) {
System.out.println(" - " + objectSummary.getKey() + " " +
"(size = " + objectSummary.getSize() +
")");
if(objectSummary.getKey().contains("part-00000")){ // The messages are stored in files named "part-00000"
S3Object object = s3Client.getObject(
new GetObjectRequest(objectSummary.getBucketName(), objectSummary.getKey()));
InputStream objectData = object.getObjectContent();
byte[] byteData = new byte[(int) objectSummary.getSize()]; // The size of the messages differ
objectData.read(byteData);
dataList.add(byteData); // Add the message to the list
objectData.close();
}
}
/* When iterating, messages are split into chunks called continuation tokens.
* All tokens have to be iterated through to get all messages. */
System.out.println("Next Continuation Token : " + result.getNextContinuationToken());
req.setContinuationToken(result.getNextContinuationToken());
} while(result.isTruncated() == true );
} catch (AmazonServiceException ase) {
System.out.println("Caught an AmazonServiceException, " +
"which means your request made it " +
"to Amazon S3, but was rejected with an error response " +
"for some reason.");
System.out.println("Error Message: " + ase.getMessage());
System.out.println("HTTP Status Code: " + ase.getStatusCode());
System.out.println("AWS Error Code: " + ase.getErrorCode());
System.out.println("Error Type: " + ase.getErrorType());
System.out.println("Request ID: " + ase.getRequestId());
} catch (AmazonClientException ace) {
System.out.println("Caught an AmazonClientException, " +
"which means the client encountered " +
"an internal error while trying to communicate" +
" with S3, " +
"such as not being able to access the network.");
System.out.println("Error Message: " + ace.getMessage());
} catch (IOException e) {
e.printStackTrace();
}
JavaRDD<byte[]> messages = sc.parallelize(dataList); // Loads the messages into an RDD
messages.saveAsObjectFile("S3URL/daily_logs/" + dateString);
This all works fine, but now I am not sure how to actually restore the data to a manageable state again. If I use
sc.objectFile
to restore the RDD I end up with a JavaRDD<'byte[]> where the byte[] is actually a JavaRDD<'byte[]> in itself. How can I restore the nested JavaRDD from the byte[] located in the JavaRDD<'byte[]>?
I hope this somehow makes sense and I am grateful for any help. In a worst case scenario I have to come up with another way to backup the data.
Best regards Mathias