0
votes

I am using AWS S3 as a backup storage for data coming in to our Spark cluster. Data comes in every second and is processed when 10 seconds of data has been read. The RDD containing the 10 seconds of data is stored to S3 using

rdd.saveAsObjectFile(s3URL + dateFormat.format(new Date()));

This means that we get a lot of files added to S3 each day in the format of

S3URL/2017/07/23/12/00/10, S3URL/2017/07/23/12/00/20 etc

From here it is easy to restore the RDD which is a

JavaRDD<'byte[]>

using either

sc.objectFile or the AmazonS3 API

The problem is that to reduce the number of files needed to iterate through we run a daily cron job that goes through each file during a day, bunch the data together and store the new RDD to S3. This is done as follows:

List<byte[]> dataList = new ArrayList<>(); // A list of all read messages
    /* Get all messages from S3 and store them in the above list */
    try {
        final ListObjectsV2Request req = new ListObjectsV2Request().withBucketName("bucketname").withPrefix("logs/" + dateString);
        ListObjectsV2Result result;
        do {               
           result = s3Client.listObjectsV2(req);
           for (S3ObjectSummary objectSummary : 
               result.getObjectSummaries()) {
               System.out.println(" - " + objectSummary.getKey() + "  " +
                       "(size = " + objectSummary.getSize() + 
                       ")");
               if(objectSummary.getKey().contains("part-00000")){ // The messages are stored in files named "part-00000"
                   S3Object object = s3Client.getObject(
                           new GetObjectRequest(objectSummary.getBucketName(), objectSummary.getKey()));
                   InputStream objectData = object.getObjectContent();
                   byte[] byteData = new byte[(int) objectSummary.getSize()]; // The size of the messages differ
                   objectData.read(byteData);
                   dataList.add(byteData); // Add the message to the list
                   objectData.close();
               }
           }
           /* When iterating, messages are split into chunks called continuation tokens.
            * All tokens have to be iterated through to get all messages. */
           System.out.println("Next Continuation Token : " + result.getNextContinuationToken());
           req.setContinuationToken(result.getNextContinuationToken());
        } while(result.isTruncated() == true ); 
     } catch (AmazonServiceException ase) {
        System.out.println("Caught an AmazonServiceException, " +
                "which means your request made it " +
                "to Amazon S3, but was rejected with an error response " +
                "for some reason.");
        System.out.println("Error Message:    " + ase.getMessage());
        System.out.println("HTTP Status Code: " + ase.getStatusCode());
        System.out.println("AWS Error Code:   " + ase.getErrorCode());
        System.out.println("Error Type:       " + ase.getErrorType());
        System.out.println("Request ID:       " + ase.getRequestId());
    } catch (AmazonClientException ace) {
        System.out.println("Caught an AmazonClientException, " +
                "which means the client encountered " +
                "an internal error while trying to communicate" +
                " with S3, " +
                "such as not being able to access the network.");
        System.out.println("Error Message: " + ace.getMessage());
    } catch (IOException e) {
        e.printStackTrace();
    }
    JavaRDD<byte[]> messages = sc.parallelize(dataList); // Loads the messages into an RDD
    messages.saveAsObjectFile("S3URL/daily_logs/" + dateString);

This all works fine, but now I am not sure how to actually restore the data to a manageable state again. If I use

sc.objectFile

to restore the RDD I end up with a JavaRDD<'byte[]> where the byte[] is actually a JavaRDD<'byte[]> in itself. How can I restore the nested JavaRDD from the byte[] located in the JavaRDD<'byte[]>?

I hope this somehow makes sense and I am grateful for any help. In a worst case scenario I have to come up with another way to backup the data.

Best regards Mathias

1

1 Answers

0
votes

I solved it by instead of storing a nested RDD I flatmapped all the byte[] into a single JavaRDD and stored that one instead.