2
votes

I was wondering how the distributed mahout recommender job org.apache.mahout.cf.taste.hadoop.item.RecommenderJob handled csv files where duplicate and triplicate user,item entries exist but with different preference values. For example, if I had a .csv file that had entries like

1,1,0.7
1,2,0.7
1,2,0.3
1,3,0.7
1,3,-0.7

How would Mahout's datamodel handle this? Would it sum up the preference values for a given user,item entry (e.g. for user item 1,2 the preference would be (0.7 + 0.3)), or does it average the values (e.g. for user item 1,2 the preference is (0.7 + 0.3)/2) or does it default to the last user,item entry it detects (e.g. for user 1,2 the preference value is set to 0.3).

I ask this question because I am considering recommendations based on multiple preference metrics (item views, likes, dislikes, saves to shopping cart, etc.). It would be helpful if the datamodel treated the preference values as linear weights (e.g. item views plus save to wish list has higher preference score than item views). If datamodel already handles this by summing, it would save me the chore of an additional map-reduce to sort and calculate total scores based on multiple metrics. Any clarification anyone could provide on mahout .csv datamodel works in this respect for org.apache.mahout.cf.taste.hadoop.item.RecommenderJob would be really appreciated. Thanks.

2
seems like, this can be solved by using R implementation of K Means algorithm. Just wanted to share the info.Swamy

2 Answers

5
votes

No, it overwrites. The model is not additive. However the model in Myrrix, a derivative of this code (that I'm commercializing) has a fundamentally additive data modet, just for the reason you give. The input values are weights and are always added.

1
votes

merge it before starting computation.

examples:

import java.io.IOException;
import java.util.Iterator;
import java.util.StringTokenizer;

import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.FloatWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.FileOutputFormat;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.MapReduceBase;
import org.apache.hadoop.mapred.Mapper;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reducer;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapred.TextInputFormat;
import org.apache.hadoop.mapred.TextOutputFormat;

public final class Merge {
    public Merge() {
    }

    public static class MergeMapper extends MapReduceBase implements
            Mapper<LongWritable, Text, Text, FloatWritable> {

        public void map(LongWritable key, Text value, OutputCollector<Text, FloatWritable> collector,
                Reporter reporter) throws IOException {
            // TODO Auto-generated method stub
            String line = value.toString();
            StringTokenizer tokenizer = new StringTokenizer(line);
            if (tokenizer.hasMoreTokens()) {
                String userId = tokenizer.nextToken(",");
                String itemId = tokenizer.nextToken(",");
                FloatWritable score = new FloatWritable(Float.valueOf(tokenizer.nextToken(",")));
                collector.collect(new Text(userId + "," + itemId), score);
            }
            else {
                System.out.println("empty line " + line);
            }

        }
    }

    public static class MergeReducer extends MapReduceBase implements
            Reducer<Text, FloatWritable, Text, FloatWritable> {

        public void reduce(Text key, Iterator<FloatWritable> scores,
                OutputCollector<Text, FloatWritable> collector, Reporter reporter) throws IOException {
            // TODO Auto-generated method stub
            float sum = 0.0f;
            while (scores.hasNext()) {
                sum += scores.next().get();
            }
            if (sum != 0.0)
                collector.collect(key, new FloatWritable(sum));
        }
    }


    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {

        JobConf conf = new JobConf(Merge.class);
        conf.setJobName("Merge Data");

        conf.setOutputKeyClass(Text.class);
        conf.setOutputValueClass(FloatWritable.class);

        conf.setMapperClass(MergeMapper.class);
        // combine the same key items
        conf.setCombinerClass(MergeReducer.class);
        conf.setReducerClass(MergeReducer.class);

        conf.setInputFormat(TextInputFormat.class);
        conf.set("mapred.textoutputformat.separator", ",");
        conf.setOutputFormat(TextOutputFormat.class);

        FileInputFormat.setInputPaths(conf, new Path("hdfs://localhost:49000/tmp/data"));
        FileOutputFormat.setOutputPath(conf, new Path("hdfs://localhost:49000/tmp/data/output"));

        JobClient.runJob(conf);
    }
}