In hadoop I am writing my custom data type as below
import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; import org.apache.hadoop.io.WritableComparable; public class Movie implements WritableComparable { String movieId; String movieTitle; public Movie(String movieId, String movieTitle) { super(); this.movieId = movieId; this.movieTitle = movieTitle; } public Movie(){ } public String getMovieId() { return movieId; } public void setMovieId(String movieId) { this.movieId = movieId; } public String getMovieTitle() { return movieTitle; } public void setMovieTitle(String movieTitle) { this.movieTitle = movieTitle; } @Override public void readFields(DataInput in) throws IOException { movieId = in.readLine(); movieTitle=in.readLine(); } @Override public void write(DataOutput out) throws IOException { // TODO Auto-generated method stub out.writeChars(movieId); out.writeChars(movieTitle); } @Override public int compareTo(Movie o) { return movieTitle.compareTo(o.movieTitle); } @Override public int hashCode(){ return movieId.hashCode(); } @Override public boolean equals(Object o){ Movie m=(Movie)o; return movieId.equals(m.movieId); } @Override public String toString(){ return movieTitle; } }
Below is my mapper code
import java.io.BufferedReader; import java.io.FileReader; import java.io.IOException; import java.net.URI; import java.util.ArrayList; import java.util.Arrays; import java.util.HashMap; import java.util.List; import java.util.Map; import org.apache.hadoop.filecache.DistributedCache; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; public class MovieMapper extends Mapper { Map> movieMap=new HashMap>(); @Override public void map(LongWritable key,Text value,Context ctx) throws IOException, InterruptedException{ String[] columns=value.toString().split("::"); if(columns.length!=4){ System.out.println("length not equal to 4"); return; } if(movieMap.containsKey(columns[1])){ List mList=movieMap.get(columns[1]); // set movie //System.out.println("In mapper, movieId="+mList.get(0)+", name="+mList.get(1)); Movie movie=new Movie(mList.get(0),mList.get(1)); //movie.setMovieId(mList.get(0)); //movie.setMovieTitle(mList.get(1)); // set MovieRating MovieRating mr=new MovieRating(); mr.setUserId(columns[0]); mr.setRating(Integer.parseInt(columns[2])); mr.setTime(columns[3]); ctx.write(movie,mr); } } @Override protected void setup(Context ctx) throws IOException { loadMovieData(ctx); } public void loadMovieData(Context ctx) throws IOException{ URI[] cacheFiles = DistributedCache.getCacheFiles(ctx.getConfiguration()); System.out.println("inloadMovieData"); if(cacheFiles!=null && cacheFiles.length>0){ System.out.println("Cache files length greater then 0"); for(URI path:cacheFiles){ System.out.println("Cache file="+path.toString()); BufferedReader reader=null; try{ reader=new BufferedReader(new FileReader(path.toString())); String line; while((line=reader.readLine())!=null){ String[] columns = line.split("::"); movieMap.put(columns[0], new ArrayList(Arrays.asList(columns))); } }catch(Exception e){ e.printStackTrace(); } finally{ reader.close(); } } } } }
In mapper class when control reach to ctx.write(movie,mr), then it shows spill faild issue. My reducer is taking input key as Movie and value as MovieRating.