I think your problem lies in reading the file in reduce(). You should read the files in configure() (using old API) or setup() (using the new API). So for every reducer it will be read just once, rather than reading it for each and every input group to the reducer (basically, each call to reduce method)
You can write something like:
Using NEW mapreduce API (org.apache.hadoop.mapreduce.*) -
public static class ReduceJob extends Reducer<Text, Text, Text, Text> {
...
Path file1;
Path file2;
...
@Override
protected void setup(Context context) throws IOException, InterruptedException {
// Get the file from distributed cached
file1 = DistributedCache.getLocalCacheFiles(context.getConfiguration())[0];
file2 = DistributedCache.getLocalCacheFiles(context.getConfiguration())[1];
// parse the file and get it's data in-memory for use in reduce method, probably in some ArrayList or HashMap.
}
@Override
protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException,
InterruptedException {
...
}
}
Using OLD mapred API (org.apache.hadoop.mapred.*) -
public static class ReduceJob extends MapReduceBase implements Reducer<Text, Text, Text, Text> {
...
Path file1;
Path file2;
...
@Override
public void configure(JobConf job) {
// Get the file from distributed cached
file1 = DistributedCache.getLocalCacheFiles(job)[0]
file2 = DistributedCache.getLocalCacheFiles(job)[1]
...
// parse the file and get it's data in-memory for use in reduce method, probably in some ArrayList or HashMap.
}
@Override
public synchronized void reduce(Text key, Iterator<Text> values, OutputCollector<Text, Text> output,
Reporter reporter) throws IOException {
...
}
}