Problem statement - Find the maximum value and print it along with the key
Input :
Key Value
ABC 10
TCA 13
RTY 23
FTY 45
The key on the left-hand side column will be unique.No duplicates allowed.
Output :
FTY 45
Since 45 is the highest of all values, it has to be printed along with the key.
I have written the MapReduce code based on the pseudocode shared in this link How to design the Key Value pairs for Mapreduce to find the maximum value in a set?
Map -
import java.io.IOException;
import java.util.StringTokenizer;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.mapreduce.Mapper;
public class Map
extends Mapper<LongWritable,Text,Text,IntWritable>
{
private Text maxKey = new Text();
private IntWritable maxValue = new IntWritable(Integer.MIN_VALUE);
@Override
protected void map( LongWritable key,Text value,Context context)
throws IOException,InterruptedException
{
String line = value.toString().trim();
StringTokenizer token = new StringTokenizer(line);
if(token.countTokens() == 2)
{
String str = token.nextToken();
while(token.hasMoreTokens())
{
int temp = Integer.parseInt(token.nextToken());
if(temp > maxValue.get())
{
maxValue.set(temp);
maxKey.set(str);
}
}
}
}
@Override
protected void cleanup(Context context)
throws IOException,InterruptedException
{
context.write(maxKey,maxValue);
}
}
Reduce
import java.io.IOException;
import java.util.Iterator;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.mapreduce.Reducer;
public class Reduce
extends Reducer<Text,IntWritable,Text,IntWritable>
{
private Text maxKey = new Text();
private IntWritable maxValue = new IntWritable(Integer.MIN_VALUE);
@Override
protected void reduce(Text key,Iterable<IntWritable> values,Context context)
throws IOException,
InterruptedException
{
Iterator<IntWritable> itr = values.iterator();
while(itr.hasNext())
{
int temp = itr.next().get();
if(temp > maxValue.get())
{
maxKey.set(key);
maxValue.set(temp);
}
}
}
@Override
protected void cleanup(Context context)
throws IOException,InterruptedException
{
context.write(maxKey,maxValue);
}
}
Driver class:
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class MapReduceDriver
{
public static void main(String[] args) throws Exception
{
Job job = new Job();
job.setJarByClass(MapReduceDriver.class);
job.setJobName("DNA Codon Analysis - Part 2");
FileInputFormat.addInputPath(job,new Path(args[0]));
FileOutputFormat.setOutputPath(job,new Path(args[1]));
job.setMapperClass(Map.class);
job.setReducerClass(Reduce.class);
job.setNumReduceTasks(1);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
System.exit(job.waitForCompletion(true)?0:1);
}
}
The program compiles and runs to show this output -
-2147483648
Probably the setting of maxValue of map() and reduce() is not correct. How to set the value correctly (both initialising with Integer.MIN_VALUE and updating after comparison) so that correct key-value pairs are recieved by the reduce() function?
key
didn't get written either. So most probably, the variablemaxKey
andmaxValue
remained the same through the lifecycle of the program. – philantrovert