0
votes

I tried below MapReduce code, I have a Driver, Mapper and Reducer. The mapper reads the each record from input file and finds the delayed reason of the flight as per key(flight number, origin, destination and airport). The Reducer should iterate the values of each record and should give the maximum delayed reason for each key but I see records with duplicate Keys in Reducer output. Is my Reducer code not working? or code is wrong? My logic should not give duplicate keys/ Records.

Driver Class:

package com.airlines.driver;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import com.airlines.reducer.MainReasonForDelayReducer;
import com.arilines.mapper.MainReasonForDelayMapper;


public class MainReasonForDelayDriver {

    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
        Configuration c = new Configuration();
        Path input = new Path(args[0]);
        Path output = new Path(args[1]);
        FileSystem fs = FileSystem.get(new Configuration());
        // true stands for recursively deleting the folder you gave
        fs.delete(output, true);
        Job job = Job.getInstance(c, "Airliines - main reason for delay");
        job.setJarByClass(MainReasonForDelayDriver.class);
        job.setMapperClass(MainReasonForDelayMapper.class);
        job.setReducerClass(MainReasonForDelayReducer.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(Text.class);
        FileInputFormat.addInputPath(job, input);
        FileOutputFormat.setOutputPath(job, output);
        int result = job.waitForCompletion(true) ? 0 : 1;
        // Print a Message with the Job Status
        if (result == 0) {
            System.out.println("-----JOB SUCESS-----");
        } else {
            System.out.println("--------- JOB FAILED -----------");
        }
    }

}

Mapper Class:

package com.arilines.mapper;

import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

public class MainReasonForDelayMapper extends Mapper<LongWritable, Text, Text, Text> {
    public static final int flight_Num = 9;
    public static final int origin = 16;
    public static final int destination = 17;
    public static final int airport = 31;
    public static final int carrierDelay = 24;
    public static final int weatherDelay = 25;
    public static final int NASDelay = 26;
    public static final int securityDelay = 27;
    public static final int lateAircraftDelay = 28;
    public static final int sumOfDelays = 29;

    public void map(LongWritable key, Text value, Context con) throws IOException, InterruptedException {
        String outline = "";
        String line = value.toString();
        String[] words = line.split(",");
        Map<String, Integer> delayValues = new HashMap<>();
        delayValues.put("carrierDelay", Integer.parseInt(words[carrierDelay]));
        delayValues.put("weatherDelay", Integer.parseInt(words[weatherDelay]));
        delayValues.put("NASDelay", Integer.parseInt(words[NASDelay]));
        delayValues.put("securityDelay", Integer.parseInt(words[securityDelay]));
        delayValues.put("lateAircraftDelay", Integer.parseInt(words[lateAircraftDelay]));
        int max = 0;
        List<String> keys = new ArrayList<>();
        keys.addAll(delayValues.keySet());

        for (int i = 0; i < delayValues.size(); i++) {
            if (delayValues.get(keys.get(i)) >= max) {
                max = delayValues.get(keys.get(i));
            }
        }
        String delayReason = "no delay";
        if (max != 0) {
            delayReason = (String) getKeyFromValue(delayValues, max);
        }
        outline = max + "," + delayReason;
        Text outlinekey = new Text(
                words[flight_Num] + "," + words[origin] + "," + words[destination] + "," + words[airport]);
        con.write(outlinekey, new Text(outline));
    }

    public static Object getKeyFromValue(Map hm, Object value) {
        for (Object o : hm.keySet()) {
            if (hm.get(o).equals(value)) {
                return o;
            }
        }
        return null;
    }

}

Reducer Class:

package com.airlines.reducer;

import java.io.IOException;
import java.util.Iterator;

import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

public class MainReasonForDelayReducer extends Reducer<Text, Text, Text, Text> {

    public void reducer(Text key, Iterator<Text> value, Context con) throws IOException, InterruptedException {
        String outline = "";
        int maxDelay = 0;
        String delayReason = "no delay";
        System.out.println(key + "reducer values.... " + value);
        while (value.hasNext()) {
            String tokens[] = value.next().toString().split(",");
            if (Integer.valueOf(tokens[0]) > maxDelay) {
                maxDelay = Integer.valueOf(tokens[0]);
                delayReason = tokens[1];
            }
        }

        outline = maxDelay + "," + delayReason;

        con.write(key, new Text(outline));
    }

}

Sample Output data:

3866,ABI,DFW,Abilene Regional   0,no delay
3866,ABI,DFW,Abilene Regional   0,no delay
3892,ABI,DFW,Abilene Regional   0,no delay
3892,ABI,DFW,Abilene Regional   0,no delay
3892,ABI,DFW,Abilene Regional   0,no delay
3892,ABI,DFW,Abilene Regional   0,no delay
3892,ABI,DFW,Abilene Regional   0,no delay
3892,ABI,DFW,Abilene Regional   0,no delay
3892,ABI,DFW,Abilene Regional   0,no delay
3892,ABI,DFW,Abilene Regional   0,no delay
3892,ABI,DFW,Abilene Regional   0,no delay
3892,ABI,DFW,Abilene Regional   0,no delay
3892,ABI,DFW,Abilene Regional   0,no delay
3892,ABI,DFW,Abilene Regional   0,no delay
3892,ABI,DFW,Abilene Regional   0,no delay
3892,ABI,DFW,Abilene Regional   0,no delay
3892,ABI,DFW,Abilene Regional   0,no delay
3892,ABI,DFW,Abilene Regional   0,no delay
3892,ABI,DFW,Abilene Regional   0,no delay
3892,ABI,DFW,Abilene Regional   0,no delay
3892,ABI,DFW,Abilene Regional   0,no delay
3892,ABI,DFW,Abilene Regional   0,no delay
3892,ABI,DFW,Abilene Regional   0,no delay
3892,ABI,DFW,Abilene Regional   0,no delay
3892,ABI,DFW,Abilene Regional   0,no delay
3892,ABI,DFW,Abilene Regional   0,no delay
3892,ABI,DFW,Abilene Regional   0,no delay
3892,ABI,DFW,Abilene Regional   0,no delay
3892,ABI,DFW,Abilene Regional   0,no delay
3892,ABI,DFW,Abilene Regional   0,no delay
3892,ABI,DFW,Abilene Regional   0,no delay

Example key and value:

Key - 3892,ABI,DFW,Abilene Regional
Value - 0,no delay
1
Just a suggestion, but this could be done more easily in Hive/SparkOneCricketeer

1 Answers

0
votes

I think the problem might be your reducer is not properly overriding the reduce method.

public void reducer(Text key, Iterator<Text> value, Context con) throws IOException, InterruptedException {
        String outline = "";

The correct way to override would be

@Override
public void reduce(Text key, Iterable<Text> iterable_values, Context context) throws IOException , InterruptedException {

Note the keyword reduce and not reducer as you have written. Also note that it is Iterable<Text> and not Iterator<Text> for the new version of the Mapreduce API which you are using.

Iterator<Text> is for older version of the API which resides in the import org.apache.hadoop.mapred.

While the newer version resides in import org.apache.hadoop.mapreduce.