I think your problem lies in reading the file in reduce(). You should read the files in configure() (using old API) or setup() (using the new API). So for every reducer it will be read just once, rather than reading it for each and every input group to the reducer (basically, each call to reduce method)
You can write something like:
Using NEW mapreduce API (org.apache.hadoop.mapreduce.*) -
public static class ReduceJob extends Reducer<Text, Text, Text, Text> {
...
Path file1;
Path file2;
...
@Override
protected void setup(Context context) throws IOException, InterruptedException {
file1 = DistributedCache.getLocalCacheFiles(context.getConfiguration())[0];
file2 = DistributedCache.getLocalCacheFiles(context.getConfiguration())[1];
}
@Override
protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException,
InterruptedException {
...
}
}
Using OLD mapred API (org.apache.hadoop.mapred.*) -
public static class ReduceJob extends MapReduceBase implements Reducer<Text, Text, Text, Text> {
...
Path file1;
Path file2;
...
@Override
public void configure(JobConf job) {
file1 = DistributedCache.getLocalCacheFiles(job)[0]
file2 = DistributedCache.getLocalCacheFiles(job)[1]
...
}
@Override
public synchronized void reduce(Text key, Iterator<Text> values, OutputCollector<Text, Text> output,
Reporter reporter) throws IOException {
...
}
}