First you need to identify the Keys and Values for the Job(Map-Reduce).
As in this case,
You need to generate the duration for every mobileNumber-date-operator combination.
Therefore,
your mapper output for each line would be like, (key - above combination, value - duration for that line).
And your reducer need to do the summation of durations for all such unique keys (combination).
Please go through the example to understand the logic.
As I concentrated mostly on the logic part, You might need to modify string/date formatting and line splits/tokens according to your business needs.
package stackoverflow.examples;
import java.io.IOException;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.Date;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class CallStatsJob {
public static class CallStatsMapper extends
Mapper<Object, Text, Text, LongWritable> {
private LongWritable duration;
private Text key = new Text();
private String mobileNumber, startTime, endTime, operator;
public void map(Object key, Text value, Context context)
throws IOException, InterruptedException {
String[] words = value.toString().split(" \\| ");
mobileNumber = words[0];
startTime = words[1];
endTime = words[2];
operator = words[3];
// for debugging
// System.out.println(mobileNumber);
// System.out.println(startTime);
// System.out.println(endTime);
// System.out.println(operator);
SimpleDateFormat sdf = new SimpleDateFormat("hh:mm dd/M/yyyy");
// String dateInString = "03:40 12/10/2013";
Date stDate, enDate;
try {
stDate = sdf.parse(startTime);
enDate = sdf.parse(endTime);
Long diff = enDate.getTime() - stDate.getTime();
Long diffMinutes = diff / (60 * 1000);
this.key = new Text(mobileNumber+"-"+stDate.getDate()+"-"+operator);
duration = new LongWritable(diffMinutes);
context.write(this.key, duration);
} catch (ParseException e) {
e.printStackTrace();
}
}
}
public static class CallStatsReducer extends
Reducer<Text, LongWritable, Text, LongWritable> {
public void reduce(Text key, Iterable<LongWritable> values,
Context context) throws IOException, InterruptedException {
Long sum = 0L;
for (LongWritable val : values) {
sum = sum + val.get();
}
context.write(key, new LongWritable(sum));
}
}
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
Job job = new Job(conf, "Caller Statistics");
job.setJarByClass(CallStatsJob.class);
job.setMapperClass(CallStatsMapper.class);
job.setReducerClass(CallStatsReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(LongWritable.class);
FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
System.exit(job.waitForCompletion(true)?0:1);
}
}
Mapper Output:(if you set 0 reducers you will be able to see this output)
9898765467-12-airtel 26
9898765467-12-vodaphone 45
9899875321-13-idea 26
9899875321-13-reliance 35
9899875321-13-idea 16
9898765467-12-vodaphone 35
Reducer Output:(general output for the above job)
9898765467-12-airtel 26
9898765467-12-vodaphone 80
9899875321-13-idea 42
9899875321-13-reliance 35
I believe this example gives you the solution as well as the understanding to proceed further.