hi,I have code written in Hadoop and now I try to migrate to Spark. The mappers and reducers are fairly complex. So I tried to reuse Mapper and Reducer classes of already existing Hadoop code inside spark program. Can somebody tell me how do I achieve this?
EDIT:
So far, I have been able to reuse mapper class of standard hadoop word-count example in spark, implemented as below
wordcount.java
import scala.Tuple2;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.*;
import org.apache.spark.api.java.function.*;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.conf.*;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapred.*;
import org.apache.hadoop.util.*;
import java.io.*;
import java.util.Arrays;
import java.util.List;
import java.util.regex.Pattern;
public final class wordcount extends Configured implements Serializable {
public static int main(String[] args) throws Exception{
SparkConf sparkConf = new SparkConf().setMaster("spark://IMPETUS-I0203:7077").setAppName("wordcount");
JavaSparkContext ctx = new JavaSparkContext(sparkConf); //created Spark context
JavaRDD<String> rec = ctx.textFile("hdfs://localhost:54310/input/words.txt"); //Record Reader
//creating a Pair RDD whose key=some arbitrary number, value = a Record
JavaPairRDD<LongWritable,Text> lines =rec.mapToPair(s->new Tuple2<LongWritable,Text>(new LongWritable(s.length()),new Text(s)));
//transforming 'lines' RDD to another such that it returns for example ('word',1) tuple.
JavaPairRDD<Text,IntWritable> ones = lines.flatMapToPair(it -> {
NotSerializableException notSerializable = new NotSerializableException();
JobConf conf = new JobConf(new Configuration(), wordcount.class);
conf.setJobName("WordCount");
conf.setOutputKeyClass(Text.class);
conf.setOutputValueClass(IntWritable.class);
Path inp = new Path("hdfs://localhost:54310/input/darcy.txt");
FileInputFormat.addInputPath(conf, inp);
FileOutputFormat.setOutputPath(conf, out);
WordCountMapper mapper = new WordCountMapper();
mapper.configure(conf);
OutputCollector<Text,IntWritable> output = new outputcollector<Text,IntWritable>() ;
mapper.map(it._1, it._2 , output, Reporter.NULL);
return ((outputcollector)output).getList();
});
ones.saveAsTextFile("hdfs://localhost:54310/output/41");
return 0;
}
}
WordCountMapper.java
import java.io.*;
import java.util.StringTokenizer;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapred.*;
import java.io.Serializable;
public class WordCountMapper extends MapReduceBase implements Mapper<LongWritable, Text, Text, IntWritable>,Serializable
{
private final static IntWritable one = new IntWritable(1);
private Text word = new Text();
public void map(LongWritable key, Text value, OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException
{
NotSerializableException notSerializable = new NotSerializableException();
String line = value.toString();
StringTokenizer tokenizer = new StringTokenizer(line);
while (tokenizer.hasMoreTokens())
{
word = new Text();
word.set(tokenizer.nextToken());
output.collect(word, one);
}
}
}
outputcollector.java
import java.util.ArrayList;
import java.util.List;
import org.apache.hadoop.mapred.*;
import scala.Tuple2;
public class outputcollector<K extends Object, V extends Object> implements OutputCollector<K, V>{
private List<Tuple2<K,V>> writer = new ArrayList<Tuple2<K,V>>();
@Override
public void collect(K key, V value) {
try{
writer.add(new Tuple2<K,V>(key,value));
}catch(Exception e){
System.out.println(e+ "\n\n****output collector error\n\n");
}
}
public List<Tuple2<K,V>> getList(){
return writer;
}
}
This code works perfectly fine and I can successfully submit this spark job. It is somehow highly inefficient compared pure spark program. It takes about 50 times longer than a simple spark word-count example. Input file is 1 GB. Input file exists on HDFS. Running in standalone mode.
I am unable to find the reason why this code is as slow as a sloth. Here, I am using WordCountMapper.java in order to simply collect the pair(word,1). That is also working in memory. So I don't see why my code has to be so slower than standard spark word-count example.
So, can anyone suggest a better approach of reusing WordCountMapper.java (hadoop mapper) in spark? or explain the reason why it is so slow? Or anything that helps achieve my ultimate goal? (mentioned in my question at the top).