I tried below MapReduce code, I have a Driver, Mapper and Reducer. The mapper reads the each record from input file and finds the delayed reason of the flight as per key(flight number, origin, destination and airport). The Reducer should iterate the values of each record and should give the maximum delayed reason for each key but I see records with duplicate Keys in Reducer output. Is my Reducer code not working? or code is wrong? My logic should not give duplicate keys/ Records.
Driver Class:
package com.airlines.driver;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import com.airlines.reducer.MainReasonForDelayReducer;
import com.arilines.mapper.MainReasonForDelayMapper;
public class MainReasonForDelayDriver {
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
Configuration c = new Configuration();
Path input = new Path(args[0]);
Path output = new Path(args[1]);
FileSystem fs = FileSystem.get(new Configuration());
// true stands for recursively deleting the folder you gave
fs.delete(output, true);
Job job = Job.getInstance(c, "Airliines - main reason for delay");
job.setJarByClass(MainReasonForDelayDriver.class);
job.setMapperClass(MainReasonForDelayMapper.class);
job.setReducerClass(MainReasonForDelayReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
FileInputFormat.addInputPath(job, input);
FileOutputFormat.setOutputPath(job, output);
int result = job.waitForCompletion(true) ? 0 : 1;
// Print a Message with the Job Status
if (result == 0) {
System.out.println("-----JOB SUCESS-----");
} else {
System.out.println("--------- JOB FAILED -----------");
}
}
}
Mapper Class:
package com.arilines.mapper;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
public class MainReasonForDelayMapper extends Mapper<LongWritable, Text, Text, Text> {
public static final int flight_Num = 9;
public static final int origin = 16;
public static final int destination = 17;
public static final int airport = 31;
public static final int carrierDelay = 24;
public static final int weatherDelay = 25;
public static final int NASDelay = 26;
public static final int securityDelay = 27;
public static final int lateAircraftDelay = 28;
public static final int sumOfDelays = 29;
public void map(LongWritable key, Text value, Context con) throws IOException, InterruptedException {
String outline = "";
String line = value.toString();
String[] words = line.split(",");
Map<String, Integer> delayValues = new HashMap<>();
delayValues.put("carrierDelay", Integer.parseInt(words[carrierDelay]));
delayValues.put("weatherDelay", Integer.parseInt(words[weatherDelay]));
delayValues.put("NASDelay", Integer.parseInt(words[NASDelay]));
delayValues.put("securityDelay", Integer.parseInt(words[securityDelay]));
delayValues.put("lateAircraftDelay", Integer.parseInt(words[lateAircraftDelay]));
int max = 0;
List<String> keys = new ArrayList<>();
keys.addAll(delayValues.keySet());
for (int i = 0; i < delayValues.size(); i++) {
if (delayValues.get(keys.get(i)) >= max) {
max = delayValues.get(keys.get(i));
}
}
String delayReason = "no delay";
if (max != 0) {
delayReason = (String) getKeyFromValue(delayValues, max);
}
outline = max + "," + delayReason;
Text outlinekey = new Text(
words[flight_Num] + "," + words[origin] + "," + words[destination] + "," + words[airport]);
con.write(outlinekey, new Text(outline));
}
public static Object getKeyFromValue(Map hm, Object value) {
for (Object o : hm.keySet()) {
if (hm.get(o).equals(value)) {
return o;
}
}
return null;
}
}
Reducer Class:
package com.airlines.reducer;
import java.io.IOException;
import java.util.Iterator;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
public class MainReasonForDelayReducer extends Reducer<Text, Text, Text, Text> {
public void reducer(Text key, Iterator<Text> value, Context con) throws IOException, InterruptedException {
String outline = "";
int maxDelay = 0;
String delayReason = "no delay";
System.out.println(key + "reducer values.... " + value);
while (value.hasNext()) {
String tokens[] = value.next().toString().split(",");
if (Integer.valueOf(tokens[0]) > maxDelay) {
maxDelay = Integer.valueOf(tokens[0]);
delayReason = tokens[1];
}
}
outline = maxDelay + "," + delayReason;
con.write(key, new Text(outline));
}
}
Sample Output data:
3866,ABI,DFW,Abilene Regional 0,no delay
3866,ABI,DFW,Abilene Regional 0,no delay
3892,ABI,DFW,Abilene Regional 0,no delay
3892,ABI,DFW,Abilene Regional 0,no delay
3892,ABI,DFW,Abilene Regional 0,no delay
3892,ABI,DFW,Abilene Regional 0,no delay
3892,ABI,DFW,Abilene Regional 0,no delay
3892,ABI,DFW,Abilene Regional 0,no delay
3892,ABI,DFW,Abilene Regional 0,no delay
3892,ABI,DFW,Abilene Regional 0,no delay
3892,ABI,DFW,Abilene Regional 0,no delay
3892,ABI,DFW,Abilene Regional 0,no delay
3892,ABI,DFW,Abilene Regional 0,no delay
3892,ABI,DFW,Abilene Regional 0,no delay
3892,ABI,DFW,Abilene Regional 0,no delay
3892,ABI,DFW,Abilene Regional 0,no delay
3892,ABI,DFW,Abilene Regional 0,no delay
3892,ABI,DFW,Abilene Regional 0,no delay
3892,ABI,DFW,Abilene Regional 0,no delay
3892,ABI,DFW,Abilene Regional 0,no delay
3892,ABI,DFW,Abilene Regional 0,no delay
3892,ABI,DFW,Abilene Regional 0,no delay
3892,ABI,DFW,Abilene Regional 0,no delay
3892,ABI,DFW,Abilene Regional 0,no delay
3892,ABI,DFW,Abilene Regional 0,no delay
3892,ABI,DFW,Abilene Regional 0,no delay
3892,ABI,DFW,Abilene Regional 0,no delay
3892,ABI,DFW,Abilene Regional 0,no delay
3892,ABI,DFW,Abilene Regional 0,no delay
3892,ABI,DFW,Abilene Regional 0,no delay
3892,ABI,DFW,Abilene Regional 0,no delay
Example key and value:
Key - 3892,ABI,DFW,Abilene Regional
Value - 0,no delay