1
votes

In hadoop I am writing my custom data type as below

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

import org.apache.hadoop.io.WritableComparable;

public class Movie implements WritableComparable {

    String movieId;
    String movieTitle;

    public Movie(String movieId, String movieTitle) {
        super();
        this.movieId = movieId;
        this.movieTitle = movieTitle;
    }

    public Movie(){

    }

    public String getMovieId() {
        return movieId;
    }

    public void setMovieId(String movieId) {
        this.movieId = movieId;
    }

    public String getMovieTitle() {
        return movieTitle;
    }

    public void setMovieTitle(String movieTitle) {
        this.movieTitle = movieTitle;
    }



    @Override
    public void readFields(DataInput in) throws IOException {
        movieId = in.readLine();
        movieTitle=in.readLine();
    }

    @Override
    public void write(DataOutput out) throws IOException {
        // TODO Auto-generated method stub
        out.writeChars(movieId);
        out.writeChars(movieTitle);
    }

    @Override
    public int compareTo(Movie o) {

        return movieTitle.compareTo(o.movieTitle);

    }

    @Override
    public int hashCode(){
        return movieId.hashCode();
    }

    @Override
    public boolean equals(Object o){
        Movie m=(Movie)o; 
        return movieId.equals(m.movieId);
    }

    @Override
    public String toString(){
        return movieTitle;
    }

}

Below is my mapper code

import java.io.BufferedReader;
import java.io.FileReader;
import java.io.IOException;
import java.net.URI;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

import org.apache.hadoop.filecache.DistributedCache;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

public class MovieMapper extends Mapper {

    Map> movieMap=new HashMap>();

    @Override
    public void map(LongWritable key,Text value,Context ctx) throws IOException, InterruptedException{
        String[] columns=value.toString().split("::");
        if(columns.length!=4){
            System.out.println("length not equal to 4");
            return;
        }
        if(movieMap.containsKey(columns[1])){
            List mList=movieMap.get(columns[1]);
            // set movie
            //System.out.println("In mapper, movieId="+mList.get(0)+", name="+mList.get(1));
            Movie movie=new Movie(mList.get(0),mList.get(1));
            //movie.setMovieId(mList.get(0));
            //movie.setMovieTitle(mList.get(1));
            // set MovieRating
            MovieRating mr=new MovieRating();
            mr.setUserId(columns[0]);
            mr.setRating(Integer.parseInt(columns[2]));
            mr.setTime(columns[3]);
            ctx.write(movie,mr);
        }
    }


    @Override
    protected void setup(Context ctx) throws IOException {
        loadMovieData(ctx);
    }

    public void loadMovieData(Context ctx) throws IOException{
        URI[] cacheFiles = DistributedCache.getCacheFiles(ctx.getConfiguration());
        System.out.println("inloadMovieData");
        if(cacheFiles!=null && cacheFiles.length>0){
            System.out.println("Cache files length greater then 0");
            for(URI path:cacheFiles){
                System.out.println("Cache file="+path.toString());
                BufferedReader reader=null;
                try{
                    reader=new BufferedReader(new FileReader(path.toString()));
                    String line;
                    while((line=reader.readLine())!=null){
                        String[] columns = line.split("::");
                        movieMap.put(columns[0], new ArrayList(Arrays.asList(columns)));
                    }
                }catch(Exception e){
                    e.printStackTrace();
                }
                finally{
                    reader.close();
                }

            }


        }
    }

}

In mapper class when control reach to ctx.write(movie,mr), then it shows spill faild issue. My reducer is taking input key as Movie and value as MovieRating.

1

1 Answers

2
votes

Because you read lines (it is looking for \n in the stream) but you write characters (which don't include \n).

Your methods should look like this:

@Override
public void readFields(DataInput in) throws IOException {
    movieId = in.readUTF();
    movieTitle = in.readUTF();
}

@Override
public void write(DataOutput out) throws IOException {
    out.writeUTF(movieId);
    out.writeUTF(movieTitle);
}