3
votes

a large mapreduce job (joining 14 input directories, adding up to about 14TB total of input) fails. Not only could we not run our job. When we just did map is cat / reduce is cat, we couldn't even complete that. It seems to stall on copying the data.

Our guess is that we are saturating the hadoop-on-emr capacity provisioned by aws. Not sure if we are saturating network, or disk space, or what. We get errors like this

"reduce > copy (436333 of 438094 at 0.10 MB/s)"

on hadoop control panel. It just hangs there, never completing the copy. Another theory is that hadoop's offline sorting is happening at same time as copying, and somehow this is a bottleneck. We have tried various permutations of more reducers, more nodes, different sized worker boxes, but somehow we haven't been able to find a combination that worked.

Since we urgently need to get this done we are doing a workaround which is to partition the data into smaller jobs. That is, each of 14 input years will be split up, and then we will join the partitions.

Does anyone have experience processing jobs of this size or larger using aws hosted hadoop, and if so can you give advice for getting just cat map / cat reduce to succeed? Like number of nodes, size of nodes, and configuration options?

Otherwise I guess we are just hitting the limitations of emr.

2

2 Answers

6
votes

Chris Smith answered this question and said I could post it to SO. His answer:

So, the size of the input data really isn't, by itself, a limit for EMR. There are just a ton of other factors.

That said, sucking in 10TB of data is a heady task. Just reading that much data is pretty brutal, then you have the bucketing/sorting going on.

First question is: what is the constraining factor? Are you seeing network bandwidth maxed out? Are you seeing CPU maxed out? Disk I/O or iops? How does these look on data nodes? What about JobTracker & NameNodes (it's not unusual to max these out while the rest of the cluster is fine)? If none of the above, there probably is a Hadoop resources that is getting maxed out, and needs to be configured differently.

Since you didn't mention any particular aspect of the contention beyond which stage it is in, which makes me suspect you don't have much in the way of metrics of what is going on underneath. Usually you need several iterations of "measure, then adjust" before you have a big job tuned right.

As a general rule of thumb, being tied up in the "reduce/copy" phase for an extended period of time is a pretty strong indicator that "you're doing it wrong". Usually the problem is you are getting tied up in the sort/spill/merge process, with nodes maxing out disk IO in some fashion. Hadoop has a number of tuning parameters that start to get wacky for jobs with large numbers of mappers and reducers, particularly if there is a big imbalance between the two. Again, Karmasphere and similar tools can help you a lot here. Typical things that need to be tweaked (I may have some of the names wrong):

Logging. In particular, things like dfs.namenode.logging.level can be important to tweak before a job. It's entirely possible to kill yourself with verbose logging. Though paradoxically, it can be your salvation as well, so...

Map output size is usually a key factor in the "reduce/copy" problem. If at all possible, think about ways to reduce your map output size. It really should be much smaller than map input size. Strip out any data that isn't strictly needed for the reduce phase. Consider using a compact binary serialization format (Java serialization will kill your performance) like protocol buffers or thrift (big win for integer data). Consider to what extent your strings can be represented with ID's/enums. Can you use a Combiner to reduce how much data must be sent over the wire? If you have CPU to spare, use compression (start with lzo or snappy, but if you still have more CPU to burn, consider gzip or even stronger stuff). If you still see the merge step taking a long time in your map task logs, you have some tweaking to do:

io.sort.factor: probably should be higher. you may even be suffering from having too many mappers depending on what you are doing. io.sort.mb: closely related to io.sort.factor, but different. If you start seeing a lot of disk i/o stress on the nodes, I'd crank this up. This chews up memory, so there is a real trade-off involved in this parameter.

mapred.job.reuse.jvm.num.tasks: only if your tasks get really small, but if they do, this is worth pushing up mapred.reduce.parallel.copies: if you aren't CPU bound, you may well want to boost this number. You'll probably end up needing to tweak other numbers to balance things out.

io.sort.record.percent: this one is the least likely to be completely off the mark due to job size. Usually if this is wrong, it's because you have really big or really small records. The golden ratio you want to aim for is "16/(16 + number of bytes per record)".

It's hard to emphasize how brutal spilling early can be on node performance. If you spill, that means the data will be written out, then read again, then written out again. On each node. So if you get this wrong, adding more nodes doesn't help (actually can make it worse). You want to look at how many records were spilled for a job vs. how many map records were output. Ideally those numbers will be the same. Now, if you have to spill, you have to spill (though again, this is often an indication that you are doing something wrong), but jobs that spill to disk only once per record just crush the others.

There can be a similar problem on the reducer side. Take a look at the counters for the merge phase. Ideally you want spilled records to be 0 or at least <= the number of reducer input records. If it is higher... that's why you have a performance problem (seriously, this can be absolutely brutal). Pay attention to the various reducer spill settings: mapred.job.shuffle.input.buffer.percent, mapred.job.shuffle.merge.percent, mapred.inmem.merge.threshold, io.sort.factor. The one that usually gets fubar'd for big jobs is mapred.inmem.merge.threshold. The first two are also often screwed up, but that happens more as a function of the nature of the job, than as a function of the job size.

dfs.namenode.handler.count: if you are generating lots of little files in HDFS, you definitely want to push this up

dfs.mapred.job.tracker.handler.count: look at how many tasks you have to get an idea if this should be higher. if you are creating thousands of little tasks running on hundreds of nodes, you aren't going to be satisfied with this being 10

dfs.datanode.handler.count: this one goes hand in hand with the parallel.copies flag. This one always gets me in to trouble because my first instinct is to ramp it up really high, and then I just create log jams in other places. ;-) Anyway, if you consider how many mappers you have talking to how many reducers, it probably makes sense to boost this a fair bit.

tasktracker.http.threads: this one is less likely an issue if you are stuck in reduce-copy. It is closer to where it should be anyway. mapred.local.dir: This is one I often have had to tweak on non-EMR clusters for jobs with massive map output. You can really become disk bound and disk space bound, so I've found it helpful to change the path to be a comma delimited list of directories, one for each drive. Of course, with EMR it doesn't make sense, but nonetheless points to how you can really run out of disk space fast.

mapred.local.dir.minspacestart: You may not realize it, but you could be running out of space for your map output. Tweaking this value to make sure that each task has enough remaining space on the system before kicking off a job can really save your bacon.

Keep in mind, Hadoop was really designed for systems with 2 cores per spindle (and this was several iterations of Moore's Law ago), with all the input and output staying inside HDFS (which allows tons of short cuts for input and output), 1GigE port per 8 cores, and few bottlenecks in the switch fabric. EMR gives you nothing like that. Amazon tries to give some decent defaults to tweak for it, but it's hard to solve the problem generically for everyone. EMR's one advantage is that you tend to get massive amounts of RAM per node, so you should spend some time making sure you use your RAM optimally to minimize disk I/O. Hadoop also really shines for jobs where the mappers consume a lot of raw data, but spit out comparatively little data. There's a massive distributed sort going on for all the data you produce in each job, and Hadoop tries, by default, to do it while leaving the bulk of the RAM and disk space available for your tasks. Having your data already bucketed/sorted can really push a lot of work out of the reducer in to the mapper, thereby avoiding a TON of overhead. Chances are, this is that's where your problem is.

0
votes

I don't see an amount of reducers and total number of slots/host in the cluster, but looks like you reducers DoS mapper hosts. You can do a several things, to solve this problem:

  1. Increase blocksize. Thats gives use less number of maps and each mapper will do more work. And here will be less parts, downloaded by reducers.
  2. Check number of http handlers (mapreduce.tasktracker.http.threads) if your cluster is relatively big (100+)
  3. Number of reducers should be between 0.7-1.2 of the cluster capacity (reducer slots).

Hope this helps.