1
votes

Can somebody give one good example link for mapreduce with Hbase? My requirement is run mapreduce on hdfs file and store reducer output to hbase table. Mapper input will be hdfs file and output will be Text,IntWritable key value pairs. Reducers output will be Put object ie add reducer Iterable IntWritable values and store in hbase table.

2

2 Answers

4
votes

Here is the code which will solve your problem



Driver

HBaseConfiguration conf =  HBaseConfiguration.create();
Job job = new Job(conf,"JOB_NAME");
    job.setJarByClass(yourclass.class);
    job.setMapperClass(yourMapper.class);
    job.setMapOutputKeyClass(Text.class);
    job.setMapOutputValueClass(Intwritable.class);
    FileInputFormat.setInputPaths(job, new Path(inputPath));
    TableMapReduceUtil.initTableReducerJob(TABLE,
            yourReducer.class, job);
    job.setReducerClass(yourReducer.class);
            job.waitForCompletion(true);


Mapper&Reducer

class yourMapper extends Mapper<LongWritable, Text, Text,IntWritable> {
//@overide map()
 }

class yourReducer
        extends
        TableReducer<Text, IntWritable, 
        ImmutableBytesWritable>
{
//@override rdeuce()
}

0
votes

**Ckeck the bellow code that works fine for me with Phoenix Hbase and map reduce **

This program will read data from Hbase table and inset result in to another table after map-reduce job .

Table :-> STOCK ,STOCK_STATS

StockComputationJob.java

public static class StockMapper extends Mapper<NullWritable, StockWritable, Text , DoubleWritable> {

    private Text stock = new Text(); 
    private DoubleWritable price = new DoubleWritable ();

    @Override
    protected void map(NullWritable key, StockWritable stockWritable, Context context) throws IOException, InterruptedException {
       double[] recordings = stockWritable.getRecordings();
       final String stockName = stockWritable.getStockName();
       System.out.println("Map-"+recordings);
       double maxPrice = Double.MIN_VALUE;
       for(double recording : recordings) {
           System.out.println("M-"+key+"-"+recording);
         if(maxPrice < recording) {
          maxPrice = recording;
             }
       }
       System.out.println(stockName+"--"+maxPrice);
       stock.set(stockName);
       price.set(maxPrice);
       context.write(stock,price);
    }

}

    public static void main(String[] args) throws Exception {

         final Configuration conf = new Configuration();
         HBaseConfiguration.addHbaseResources(conf);
         conf.set(HConstants.ZOOKEEPER_QUORUM, zkUrl);
         final Job job = Job.getInstance(conf, "stock-stats-job");
      // We can either specify a selectQuery or ignore it when we would like to retrieve all the columns
         final String selectQuery = "SELECT STOCK_NAME,RECORDING_YEAR,RECORDINGS_QUARTER FROM STOCK ";

         // StockWritable is the DBWritable class that enables us to process the Result of the above query
         PhoenixMapReduceUtil.setInput(job,StockWritable.class,"STOCK",selectQuery);  

         // Set the target Phoenix table and the columns
         PhoenixMapReduceUtil.setOutput(job, "STOCK_STATS", "STOCK_NAME,MAX_RECORDING");

         job.setMapperClass(StockMapper.class);
         job.setReducerClass(StockReducer.class); 
         job.setOutputFormatClass(PhoenixOutputFormat.class);

         job.setMapOutputKeyClass(Text.class);
         job.setMapOutputValueClass(DoubleWritable.class);
         job.setOutputKeyClass(NullWritable.class);
         job.setOutputValueClass(StockWritable.class); 
         TableMapReduceUtil.addDependencyJars(job);
         job.waitForCompletion(true);
     }

}

StockReducer.java

    public class StockReducer extends Reducer<Text, DoubleWritable, NullWritable , StockWritable> {

     protected void reduce(Text key, Iterable<DoubleWritable> recordings, Context context) throws IOException, InterruptedException {
          double maxPrice = Double.MIN_VALUE;
          System.out.println(recordings);
          for(DoubleWritable recording : recordings) {
              System.out.println("R-"+key+"-"+recording);
            if(maxPrice < recording.get()) {
             maxPrice = recording.get(); 
            }
          } 
          final StockWritable stock = new StockWritable();
          stock.setStockName(key.toString());
          stock.setMaxPrice(maxPrice);
          System.out.println(key+"--"+maxPrice);
          context.write(NullWritable.get(),stock);
        }


}

StockWritable.java

public class StockWritable  implements DBWritable,Writable {

      private String stockName;

        private int year;

        private double[] recordings;

        private double maxPrice;   

        public void readFields(DataInput input) throws IOException {

        }

        public void write(DataOutput output) throws IOException {

        }

        public void readFields(ResultSet rs) throws SQLException {
           stockName = rs.getString("STOCK_NAME");
           setYear(rs.getInt("RECORDING_YEAR"));
           final Array recordingsArray = rs.getArray("RECORDINGS_QUARTER");
           setRecordings((double[])recordingsArray.getArray());
        }

        public void write(PreparedStatement pstmt) throws SQLException {
           pstmt.setString(1, stockName);
           pstmt.setDouble(2, maxPrice); 
        }

        public int getYear() {
            return year;
        }

        public void setYear(int year) {
            this.year = year;
        }

        public double[] getRecordings() {
            return recordings;
        }

        public void setRecordings(double[] recordings) {
            this.recordings = recordings;
        }

        public double getMaxPrice() {
            return maxPrice;
        }

        public void setMaxPrice(double maxPrice) {
            this.maxPrice = maxPrice;
        }

        public String getStockName() {
            return stockName;
        }

        public void setStockName(String stockName) {
            this.stockName = stockName;
        }


}