1
votes

I want to write Java program which reads input from HDFS, processes it using MapReduce and writes the output into a MongoDb.

Here is the scenario:

  1. I have a Hadoop Cluster which has 3 datanodes.
  2. A java program reads the input from the HDFS, processes it using MapReduce.
  3. Finally, write the result into a MongoDb.

Actually, reading from HDFS and processing it with MapReduce are simple. But I gets stuck about writing the result into a MongoDb. Is there any Java API supported to write the result into MongoDB? Another question is that since it is a Hadoop Cluster, so we don't know which datanode will run the Reducer task and generate the result, is it possible to write the result into a MongoDb which is installed on a specific server?

If I want to write the result into HDFS, the code will be like this:

@Override
public void reduce(Text key, Iterable<LongWritable> values, Context context) throws IOException, InterruptedException 
{
    long sum = 0;
    for (LongWritable value : values) 
    {
        sum += value.get();
    }

    context.write(new Text(key), new LongWritable(sum));
}

Now I want to write the result into a MongoDb instead of HDFS, how can I do that?

3
What do you mean, you get stuck about writing the result into MongoDB? MongoDB has a Java API available (docs.mongodb.org/ecosystem/drivers/java).WiredPrairie
Using mongo-java-driver in Reducer class? Is it gonna work?user2597504
Why not try it? (And why are you copying data from HDFS into MongoDB?)WiredPrairie
Probably to be able to query it. crazy, i know.evanchooly
hey @user2597504, could you tell me how it did work(or how you did solve this issue)? your question is exactly the same that I was curious about.Hoon

3 Answers

2
votes

You want «MongoDB Connector for Hadoop». The examples.

It's tempting to just add code in your Reducer that, as a side effect, inserts data into your database. Avoid this temptation. One reason to use a connector as opposed to just inserting data as a side effect of your reducer class is speculative execution: Hadoop can sometimes run two of the exact same reduce tasks in parallel, which can lead to extraneous inserts and duplicate data.

0
votes

Yes. You write to mongo as usual. The fact that your mongo db is set to run on shards is a detail that is hidden from you.

0
votes

I spent my morning to implement the same scenario. Here my solution:

Create three classes:

  • Experiment.java: for job configuration and submission
  • MyMap.java: mapper class
  • MyReduce.java: reducer class

    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.conf.Configured;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapred.FileInputFormat;
    import org.apache.hadoop.mapred.JobClient;
    import org.apache.hadoop.mapred.JobConf;
    import org.apache.hadoop.util.Tool;
    import org.apache.hadoop.util.ToolRunner;
    
    import com.mongodb.hadoop.io.BSONWritable;
    import com.mongodb.hadoop.mapred.MongoOutputFormat;
    
    public class Experiment extends Configured implements Tool{
    
         public int run(final String[] args) throws Exception {
            final Configuration conf = getConf();
            conf.set("mongo.output.uri", args[1]);
    
            final JobConf job = new JobConf(conf);
    
            FileInputFormat.setInputPaths(job, new Path(args[0]));
            job.setJarByClass(Experiment.class);
    
            job.setInputFormat(org.apache.hadoop.mapred.TextInputFormat.class);
            job.setMapperClass(MyMapper.class);
            job.setReducerClass(MyReducer.class);
            job.setOutputFormat(MongoOutputFormat.class);
            job.setOutputKeyClass(Text.class);
            job.setOutputValueClass(BSONWritable.class);
            job.setMapOutputKeyClass(Text.class);
            job.setMapOutputValueClass(IntWritable.class);
    
            JobClient.runJob(job);
    
            return 0;
        }
    
        public static void main(final String[] args) throws Exception{
    
            int res = ToolRunner.run(new TweetPerUserToMongo(), args);
            System.exit(res);
        }
    }
    

When you run Experiment class from your cluster, you will enter two parameters. First parameter is your input source from HDFS location, second parameter refers to mongodb URI that is going keep your results. Here is an example call. Assuming that your Experiment.java is under the package name org.example.

sudo -u hdfs hadoop jar ~/jar/myexample.jar org.example.Experiment myfilesinhdfs/* mongodb://192.168.0.1:27017/mydbName.myCollectionName

This might not be the best way but it does the job for me.