1
votes

hi,I have code written in Hadoop and now I try to migrate to Spark. The mappers and reducers are fairly complex. So I tried to reuse Mapper and Reducer classes of already existing Hadoop code inside spark program. Can somebody tell me how do I achieve this?

EDIT:
So far, I have been able to reuse mapper class of standard hadoop word-count example in spark, implemented as below

wordcount.java

import scala.Tuple2; 

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.*;
import org.apache.spark.api.java.function.*;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.conf.*;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapred.*;
import org.apache.hadoop.util.*;

import java.io.*;
import java.util.Arrays;
import java.util.List;
import java.util.regex.Pattern;

public final class wordcount  extends Configured  implements Serializable {
  public static int main(String[] args) throws Exception{

        SparkConf sparkConf = new SparkConf().setMaster("spark://IMPETUS-I0203:7077").setAppName("wordcount");
        JavaSparkContext ctx = new JavaSparkContext(sparkConf); //created Spark context
        JavaRDD<String> rec = ctx.textFile("hdfs://localhost:54310/input/words.txt"); //Record Reader

  //creating a  Pair RDD whose key=some arbitrary number, value = a Record
        JavaPairRDD<LongWritable,Text> lines =rec.mapToPair(s->new Tuple2<LongWritable,Text>(new LongWritable(s.length()),new Text(s)));

  //transforming 'lines' RDD to another such that it returns for example ('word',1) tuple.
      JavaPairRDD<Text,IntWritable> ones = lines.flatMapToPair(it -> {

          NotSerializableException notSerializable = new NotSerializableException();
          JobConf conf = new JobConf(new Configuration(), wordcount.class);
          conf.setJobName("WordCount");
          conf.setOutputKeyClass(Text.class);
          conf.setOutputValueClass(IntWritable.class);
          Path inp = new Path("hdfs://localhost:54310/input/darcy.txt");
          FileInputFormat.addInputPath(conf, inp);
          FileOutputFormat.setOutputPath(conf, out);

          WordCountMapper mapper = new WordCountMapper();
        mapper.configure(conf);
        OutputCollector<Text,IntWritable> output = new outputcollector<Text,IntWritable>() ;

            mapper.map(it._1, it._2 , output, Reporter.NULL);

          return  ((outputcollector)output).getList();
        });

        ones.saveAsTextFile("hdfs://localhost:54310/output/41");
        return 0;
  }
}


WordCountMapper.java

import java.io.*;
import java.util.StringTokenizer;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapred.*;
import java.io.Serializable;

public class WordCountMapper extends MapReduceBase implements Mapper<LongWritable, Text, Text, IntWritable>,Serializable
{
      private final static IntWritable one = new IntWritable(1);
      private Text word = new Text();

      public void map(LongWritable key, Text value, OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException
      {
            NotSerializableException notSerializable = new NotSerializableException();
            String line = value.toString();
            StringTokenizer tokenizer = new StringTokenizer(line);
            while (tokenizer.hasMoreTokens())
            {  
               word = new Text();
               word.set(tokenizer.nextToken());
               output.collect(word, one);
            }
       }
}


outputcollector.java

import java.util.ArrayList;
import java.util.List;
import org.apache.hadoop.mapred.*;
import scala.Tuple2;

public class outputcollector<K extends Object, V extends Object> implements OutputCollector<K, V>{
    private List<Tuple2<K,V>> writer = new ArrayList<Tuple2<K,V>>();
    @Override
    public void collect(K key, V value) {
        try{
            writer.add(new Tuple2<K,V>(key,value));
        }catch(Exception e){
            System.out.println(e+ "\n\n****output collector error\n\n");
        }
    }
    public List<Tuple2<K,V>> getList(){
        return writer;
    }
}


This code works perfectly fine and I can successfully submit this spark job. It is somehow highly inefficient compared pure spark program. It takes about 50 times longer than a simple spark word-count example. Input file is 1 GB. Input file exists on HDFS. Running in standalone mode.

I am unable to find the reason why this code is as slow as a sloth. Here, I am using WordCountMapper.java in order to simply collect the pair(word,1). That is also working in memory. So I don't see why my code has to be so slower than standard spark word-count example.

So, can anyone suggest a better approach of reusing WordCountMapper.java (hadoop mapper) in spark? or explain the reason why it is so slow? Or anything that helps achieve my ultimate goal? (mentioned in my question at the top).

1
You can't re-use it!eliasah
you are doing a wordcount and you are saying that the mapper and reducer are complex? It's better to build your application from scratch considering the old mapreduce workflow!eliasah
I have 20k lines of code in hadoop. My aim is to reuse some part of that code in spark. I've taken this simple wordcount example as I am just researching whether I can reuse any hadoop code in Spark. I want to explore every possible way through which I can reuse Hadoop code.Meliodas
Like I said before you can't! You can just use the workflow! It's the same programming paradigm after alleliasah
You might wanna take a look at this. Matei zaharia said you could reuse it. mail-archives.us.apache.org/mod_mbox/spark-user/201406.mbox/…Meliodas

1 Answers

0
votes

The basic way of converting a mapreduce to spark is:

rdd.mapPartitions(partition -> 
    setup() //map setup
    partition.map( item => 
        val output = process(item) 
        if (!partition.hasNext) {
           // Some cleanup code here
        }
    )
).groupByKey().mapPartitions( //similarly for reduce code).saveAsHadoopFile(//params) //To save on HDFS

Following link points to a set of two articles on cloudera. Not everything is discussed but if you go through it you get the gist of how to convert some parts of hadoop jobs into spark. For example how to do setup and cleanup.

http://blog.cloudera.com/blog/2014/09/how-to-translate-from-mapreduce-to-apache-spark/

Note: I have tried converting mapreduce to spark but it has resulted in a slower application. Maybe it is my own inefficiency in using scala or maybe spark is not suited for batch jobs. So be aware of this as well.