I am trying to convert a String into a Text object and output the value to the Reducer as MapOutputKey. However, in the reducer, the key looks like a black space with value(not the MapOutputValue) as a memory location. However, if i hardcode a String into the text object and send it out, I am able to see the proper values in the Reducer. EDIT:- Now this is very strange AND interesting. The same program works fine for a smaller dataset,i.e, The text key sent from the mapper is received in the reducer. But if i use the original dataset, the text key sent from map does not reflect in the reducer. I don'n know, there is something about Hadoop that i am not aware of. Here is the entire source code:-
public class InvertedIndex {
public InvertedIndex(){}
public static class InvertedMap extends Mapper<IntWritable,Text,Text,DocAttributes> {
Integer docNum;
String word;
Integer docFrequency;
Integer termFrequency;
private Text key = new Text();
@Override
public void map(IntWritable mapIn,Text mapValIn,Context context) throws IOException,InterruptedException{
try{
String line = mapValIn.toString();
String[] words = line.trim().split("\\s");
List<String> wordList = new ArrayList<String>(Arrays.asList(words));
String k;
for(int i=0; i<wordList.size();i++){
k = wordList.get(i);
//Text key = new Text(k);
// DocAttributes da = new DocAttributes();
key.set(k);
int sum=1;
for(int j=i;j<wordList.size();j++){
if(wordList.get(i).matches(wordList.get(j)) && j>i){
sum++;
docNum = mapIn.get();
docFrequency = sum;
word = wordList.get(i);
termFrequency = sum;
wordList.remove(j);
}
else{
docNum = mapIn.get();
docFrequency = sum;
word = wordList.get(i);
termFrequency = sum;
}
}
if(i == words.length-1){
docNum = mapIn.get();
docFrequency = sum;
word = wordList.get(i);
termFrequency = sum;
}
context.write(key, new DocAttributes(docNum,word,docFrequency,termFrequency));
}
}
catch(NullPointerException ne){
ne.printStackTrace();
}
}
}
public static class InvertedReduce extends Reducer<Text,DocAttributes,LongWritable,DocAttributes>{
@Override
public void reduce(Text key, Iterable<DocAttributes> value,Context context) throws IOException,InterruptedException{
Iterator<DocAttributes> iterator = value.iterator();
DocAttributes doc = new DocAttributes();
List<DocAttributes> list = new ArrayList<DocAttributes>();
while(iterator.hasNext()){
list.add(new DocAttributes(iterator.next()));
}
Integer docFrequency = 0;
for(DocAttributes d : list){
docFrequency += d.getDocFrequency();
doc.setDocNum(d.getDocNum());
doc.setWord(d.getWord());
doc.setDocFrequency(docFrequency);
doc.setTermFrequency(d.getTermFrequency());
}
context.write(new LongWritable(), doc);
}
}
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
Job job = Job.getInstance(conf);
job.setMapperClass(InvertedMap.class);
// job.setCombinerClass(InvertedCombine.class);
job.setReducerClass(InvertedReduce.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(DocAttributes.class);
job.setJarByClass(InvertedIndex.class);
TextInputFormat.addInputPath(job, new Path(args[0]));
TextOutputFormat.setOutputPath(job,new Path(args[1]));
job.setInputFormatClass(DocInput.class);
job.setOutputFormatClass(TextOutputFormat.class);
job.submit();
job.waitForCompletion(true);
}
}