6
votes

so this problem has been driving me nuts, and it is starting to feel like spark with s3 is not the right tool for this specific job. Basically, I have millions of smaller files in an s3 bucket. For reasons I can't necessarily get into, these files cannot be consolidated (one they are unique encrypted transcripts). I have seen similar questions as this one, and every single solution has not produced good results. First thing I tried was wild cards:

sc.wholeTextFiles(s3aPath + "/*/*/*/*.txt").count();

Note: the count was more debugging on how long it would take to process the files. This job almost took an entire day with over 10 instances but still failed with the error posted at the bottom of the listing. I then found this link, where it basically said this isn't optimal: https://forums.databricks.com/questions/480/how-do-i-ingest-a-large-number-of-files-from-s3-my.html

Then, I decided to try another solution that I can't find at the moment, which said load all of the paths, then union all of the rdds

    ObjectListing objectListing = s3Client.listObjects(bucket);
    List<JavaPairRDD<String, String>> rdds = new ArrayList<>();
    List<JavaPairRDD<String, String>> tempMeta = new ArrayList<>();
    //initializes objectListing
    tempMeta.addAll(objectListing.getObjectSummaries().stream()
            .map(func)
            .filter(item -> item != null && item.getMediaType().equalsIgnoreCase("transcript"))
            .map(item -> SparkConfig.getSparkContext().wholeTextFiles("s3a://" + bucket + "/" + item.getFileName()))
            .collect(Collectors.toList()));

    while(objectListing.isTruncated()) {
        objectListing = s3Client.listNextBatchOfObjects(objectListing);
        tempMeta.addAll(objectListing.getObjectSummaries().stream()
                .map(func)
                .filter(item -> item != null && item.getMediaType().equalsIgnoreCase("transcript"))
                .map(item -> SparkConfig.getSparkContext().wholeTextFiles("s3a://" + bucket + "/" + item.getFileName()))
                .collect(Collectors.toList()));
        if (tempMeta.size() > 5000) {
            rdds.addAll(tempMeta);
            tempMeta = new ArrayList<>();
        }
    }

    if (!tempMeta.isEmpty()){
        rdds.addAll(tempMeta);
    }
    return SparkConfig.getSparkContext().union(rdds.get(0), rdds.subList(1, rdds.size()));

Then, even when I set set the emrfs-site config to:

{
    "Classification": "emrfs-site",
    "Properties": {
      "fs.s3.consistent.retryPolicyType": "fixed",
      "fs.s3.consistent.retryPeriodSeconds": "15",
      "fs.s3.consistent.retryCount": "20",
      "fs.s3.enableServerSideEncryption": "true",
      "fs.s3.consistent": "false"
    }
}

I got this error within 6 hours of every time I tried running the job:

17/02/15 19:15:41 INFO AmazonHttpClient: Unable to execute HTTP request: randomBucket.s3.amazonaws.com:443 failed to respond
org.apache.http.NoHttpResponseException: randomBucket.s3.amazonaws.com:443 failed to respond

So first, is there a way to use smaller files with spark from s3? I don't care if the solution is suboptimal, I just want to try and get something working. I thought about trying spark streaming, since its internals are a little different with loading all of the files. I would then use fileStream and set newFiles to false. Then I could batch process them. However, that is not what spark streaming was built for, so I am conflicted in going that route.

As a side note, I generated millions of small files into hdfs, and tried the same job, and it finished within an hour. This makes me feel like it is s3 specific. Also, I am using s3a, not the ordinary s3.

1

1 Answers

15
votes

If you are using amazon EMR, then you need to use s3:// URLs; the s3a:// ones are for the ASF releases.

A big issue is just how long it takes to list directory trees in s3, especially that recursive tree walk. The spark code assumes its a fast filesystem where listing dirs and stating files is low cost, whereas in fact each operation takes 1-4 HTTPS requests, which, even on reused HTTP/1.1 connections, hurts. It can be so slow you can see the pauses in the log.

Where this really hurts is that it is the up front partitioning where a lot of the delay happens, so it's the serialized bit of work which is being brought to its knees.

Although there's some speedup in treewalking on S3a coming in Hadoop 2.8 as part of the S3a phase II work, wildcard scans of //*.txt form aren't going to get any speedup. My recommendation is to try to flatten your directory structure so that you move from a deep tree to something shallow, maybe even all in the same directory, so that it can be scanned without the walk, at a cost of 1 HTTP request per 5000 entries.

Bear in mind that many small file are pretty expensive anyway, including in HDFS, where they use up storage. There's a special aggregate format, HAR files, which are like tar files except that hadoop, hive and spark can all work inside the file itself. That may help, though I've not seen any actual performance test figures there.