0
votes

I currently have streaming jobs run with mapper and reducer code written in ruby. I want to convert these to java. I do not know how to run a streaming job with EMR hadoop using java. The sample given in amazon's EMR website of cloudburst is too complex. Following are the details of how I run the jobs currently.

Code to start a job:

        elastic-mapreduce --create --alive --plain-output --master-instance-type m1.small 
--slave-instance-type m1.xlarge --num-instances 2  --name "Job Name" --bootstrap-action 
    s3://bucket-path/bootstrap.sh

Code to add a step:

    elastic-mapreduce -j <job_id> --stream --step-name "my_step_name" 
--jobconf mapred.task.timeout=0 --mapper s3://bucket-path/mapper.rb 
--reducer s3://bucket-path/reducerRules.rb --cache s3://bucket-path/cache/cache.txt 
--input s3://bucket-path/input --output s3://bucket-path/output

Mapper code reads from a csv file which is mentioned above as EMR's cache argument as well as it reads from the input s3 bucket which also has some csv files, does some calculations and prints a csv output lines to standard output.

//mapper.rb 
CSV_OPTIONS  = {
  // some CSV options
}

begin
    file = File.open("cache.txt")
    while (line = file.gets)
        // do something
    end
    file.close
end

input  = FasterCSV.new(STDIN, CSV_OPTIONS)
input.each{ 
// do calculations and get result
puts (result)
}

//reducer.rb

$stdin.each_line do |line|
// do some aggregations and get aggregation_result
if(some_condition) puts(aggregation_result)
end
2

2 Answers

0
votes

You don't use streaming if you're using java. You build a Jar directly against the MapReduce API.

Check out the examples folder of the hadoop source for some good examples of how to do this, including the infamous wordcount: https://github.com/apache/hadoop/tree/trunk/src/examples/org/apache/hadoop/examples

I'm not totally sure why you want to use Java, but coding directly to the API's can be painful. You might want to try one of the following: Java Projects:

Non-Java:

FWIW I think Pig would probably be my pick, and is supported out of the box on EMR.

0
votes

Since now I have a better stronghold on Hadoop and Mapreduce, here is what I had expected:

To start a cluster, the code will remain more or less same as in the question but we can add config parameters:

ruby elastic-mapreduce --create --alive --plain-output --master-instance-type m1.xlarge --slave-instance-type m1.xlarge --num-instances 11  --name "Java Pipeline" --bootstrap-action s3://elasticmapreduce/bootstrap-actions/install-ganglia --bootstrap-action s3://elasticmapreduce/bootstrap-actions/configure-hadoop --args "--mapred-config-file, s3://com.versata.emr/conf/mapred-site-tuned.xml"

To add Job Steps:

Step 1:

ruby elastic-mapreduce --jobflow <jobflo_id> --jar s3://somepath/job-one.jar --arg s3://somepath/input-one --arg s3://somepath/output-one --args -m,mapred.min.split.size=52880 -m,mapred.task.timeout=0

Step2:

ruby elastic-mapreduce --jobflow <jobflo_id> --jar s3://somepath/job-two.jar --arg s3://somepath/output-one --arg s3://somepath/output-two --args -m,mapred.min.split.size=52880 -m,mapred.task.timeout=0

Now as for the Java code, There will be one Main class which would contain one implementation each of the following classes:

  • org.apache.hadoop.mapreduce.Mapper;
  • org.apache.hadoop.mapreduce.Reducer;

Each of these have to override methods map() and reduce() to do the desired job.

The Java class for problem in question would look like following:

public class SomeJob extends Configured implements Tool {

    private static final String JOB_NAME = "My Job";

    /**
     * This is Mapper.
     */
    public static class MapJob extends Mapper<LongWritable, Text, Text, Text> {

        private Text outputKey = new Text();
        private Text outputValue = new Text();

        @Override
        protected void setup(Context context) throws IOException, InterruptedException {

            // Get the cached file
            Path file = DistributedCache.getLocalCacheFiles(context.getConfiguration())[0];

            File fileObject = new File (file.toString());
            // Do whatever required with file data
        }

        @Override
        public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
            outputKey.set("Some key calculated or derived");
            outputVey.set("Some Value calculated or derived");
            context.write(outputKey, outputValue);
        }
        }

    /**
     * This is Reducer.
     */
    public static class ReduceJob extends Reducer<Text, Text, Text, Text> {

    private Text outputKey = new Text();
    private Text outputValue = new Text();

        @Override
        protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException,
                InterruptedException {
            outputKey.set("Some key calculated or derived");
            outputVey.set("Some Value calculated or derived");
            context.write(outputKey, outputValue);
        }
    }

    @Override
    public int run(String[] args) throws Exception {

        try {
            Configuration conf = getConf();
            DistributedCache.addCacheFile(new URI(args[2]), conf);
            Job job = new Job(conf);

            job.setJarByClass(TaxonomyOverviewReportingStepOne.class);
            job.setJobName(JOB_NAME);

            job.setMapperClass(MapJob.class);
            job.setReducerClass(ReduceJob.class);
            job.setOutputKeyClass(Text.class);
            job.setOutputValueClass(Text.class);

            job.setInputFormatClass(TextInputFormat.class);
            job.setOutputFormatClass(TextOutputFormat.class);
            job.setMapOutputKeyClass(Text.class);
            job.setMapOutputValueClass(Text.class);
            FileInputFormat.setInputPaths(job, args[0]);
            FileOutputFormat.setOutputPath(job, new Path(args[1]));

            boolean success = job.waitForCompletion(true);
            return success ? 0 : 1;
        } catch (Exception e) {
            e.printStackTrace();
            return 1;
        }

    }

    public static void main(String[] args) throws Exception {

        if (args.length < 3) {
            System.out
                    .println("Usage: SomeJob <comma sparated list of input directories> <output dir> <cache file>");
            System.exit(-1);
        }

        int result = ToolRunner.run(new TaxonomyOverviewReportingStepOne(), args);
        System.exit(result);
    }

}