1
votes

Problem statement - Find the maximum value and print it along with the key

Input :

Key       Value
ABC       10
TCA       13
RTY       23
FTY       45

The key on the left-hand side column will be unique.No duplicates allowed.

Output :

FTY       45

Since 45 is the highest of all values, it has to be printed along with the key.

I have written the MapReduce code based on the pseudocode shared in this link How to design the Key Value pairs for Mapreduce to find the maximum value in a set?

Map -

import java.io.IOException;
import java.util.StringTokenizer;

import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.mapreduce.Mapper;

public class Map 
            extends Mapper<LongWritable,Text,Text,IntWritable>
{

private Text maxKey = new Text();
private IntWritable maxValue = new IntWritable(Integer.MIN_VALUE);

@Override
protected void map( LongWritable key,Text value,Context context) 
                        throws IOException,InterruptedException
{
    String line = value.toString().trim();
    StringTokenizer token = new StringTokenizer(line);

    if(token.countTokens() == 2)
    {
        String str = token.nextToken();

        while(token.hasMoreTokens())
        {
            int temp = Integer.parseInt(token.nextToken());

            if(temp > maxValue.get())
            {
                maxValue.set(temp);
                maxKey.set(str);
            }
        }
    }

}

@Override
protected void cleanup(Context context)
        throws IOException,InterruptedException
{
    context.write(maxKey,maxValue);
}
}

Reduce

import java.io.IOException;
import java.util.Iterator;

import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.mapreduce.Reducer;

public class Reduce 
                extends Reducer<Text,IntWritable,Text,IntWritable>
{

private Text maxKey = new Text();
private IntWritable maxValue = new IntWritable(Integer.MIN_VALUE);

@Override
protected void reduce(Text key,Iterable<IntWritable> values,Context context)
                                        throws IOException, 
InterruptedException
    {
        Iterator<IntWritable> itr = values.iterator();

        while(itr.hasNext())
        {
            int temp = itr.next().get();
            if(temp > maxValue.get())
            {
                maxKey.set(key);
                maxValue.set(temp);
            }
        }

    }

@Override
protected void cleanup(Context context)
        throws IOException,InterruptedException
{
    context.write(maxKey,maxValue);
}
}

Driver class:

import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;


public class MapReduceDriver
{
public static void main(String[] args) throws Exception
{
    Job job = new Job();

    job.setJarByClass(MapReduceDriver.class);
    job.setJobName("DNA Codon Analysis - Part 2");


    FileInputFormat.addInputPath(job,new Path(args[0]));
    FileOutputFormat.setOutputPath(job,new Path(args[1]));

    job.setMapperClass(Map.class);
    job.setReducerClass(Reduce.class);
    job.setNumReduceTasks(1);

    job.setOutputKeyClass(Text.class);
    job.setOutputValueClass(IntWritable.class);

    System.exit(job.waitForCompletion(true)?0:1);

}
}

The program compiles and runs to show this output -

     -2147483648

Probably the setting of maxValue of map() and reduce() is not correct. How to set the value correctly (both initialising with Integer.MIN_VALUE and updating after comparison) so that correct key-value pairs are recieved by the reduce() function?

1
Also, notice that key didn't get written either. So most probably, the variable maxKey and maxValue remained the same through the lifecycle of the program.philantrovert
On a different note : Why aren't you using Spark for this? stackoverflow.com/questions/38811877/…user238607
I am a Hadoop beginner and just started learning the concepts of MapReduce in Java. Thanks for your suggestion! I will definitely keep it in mind and will use Spark once I advance there.Monami Sen

1 Answers

2
votes

Since your keys are always unique, you'll not be able to aggregate them in the reducer. Therefore, if your dataset isn't extremely large, you can write the output from mapper with one common key, which will force all the output of the mapper to go to only one reducer.

And then in the reducer, you can iterate over the values to compare and write the maximum value along with the key.

In the mapper class, write your file to context with a common key-val pair

public class Map extends Mapper<LongWritable,Text,Text,Text>{
private final Text commonKey = new Text("CommonKey");

    @Override
    protected void map( LongWritable key,Text value,Context context) 
                            throws IOException,InterruptedException {
        String line = value.toString().trim();
        String[] kvpair = line.split("\\s+");
        context.write(commonKey, new Text(kvpair[0] + "," + kvpair[1]));
    }
}

And then in reducer, find the maximum value and write to context.

public static class Reduce extends Reducer<Text, Text, NullWritable, Text>{
    private final Integer MAXIMUM_VALUE = Integer.MIN_VALUE;
    public void reduce(Text commonKey, Iterable<Text> values, Context context){
        Integer finalMax = MAXIMUM_VALUE;
        String finalKey;
        for (Text value: values){
            String[] kvpair = value.toString().trim().split(",")
            if(Integer.parseInt(kvpair[1]) > finalMax){
                finalKey = kvpair[0];
                finalMax = Integer.parseInt(kvpair[1]);
            }
        }
        context.write(new Text(finalKey), new IntWritable(finalMax) );
    }
}

Expect some errors in the code. Just wrote it in a text editor to give you a slight idea about how you can handle your problem differently.