Can somebody give one good example link for mapreduce with Hbase? My requirement is run mapreduce on hdfs file and store reducer output to hbase table. Mapper input will be hdfs file and output will be Text,IntWritable key value pairs. Reducers output will be Put object ie add reducer Iterable IntWritable values and store in hbase table.
2 Answers
4
votes
Here is the code which will solve your problem
Driver
HBaseConfiguration conf = HBaseConfiguration.create();
Job job = new Job(conf,"JOB_NAME");
job.setJarByClass(yourclass.class);
job.setMapperClass(yourMapper.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Intwritable.class);
FileInputFormat.setInputPaths(job, new Path(inputPath));
TableMapReduceUtil.initTableReducerJob(TABLE,
yourReducer.class, job);
job.setReducerClass(yourReducer.class);
job.waitForCompletion(true);
Mapper&Reducer
class yourMapper extends Mapper<LongWritable, Text, Text,IntWritable> {
//@overide map()
}
class yourReducer
extends
TableReducer<Text, IntWritable,
ImmutableBytesWritable>
{
//@override rdeuce()
}
0
votes
**Ckeck the bellow code that works fine for me with Phoenix Hbase and map reduce **
This program will read data from Hbase table and inset result in to another table after map-reduce job .
Table :-> STOCK ,STOCK_STATS
StockComputationJob.java
public static class StockMapper extends Mapper<NullWritable, StockWritable, Text , DoubleWritable> {
private Text stock = new Text();
private DoubleWritable price = new DoubleWritable ();
@Override
protected void map(NullWritable key, StockWritable stockWritable, Context context) throws IOException, InterruptedException {
double[] recordings = stockWritable.getRecordings();
final String stockName = stockWritable.getStockName();
System.out.println("Map-"+recordings);
double maxPrice = Double.MIN_VALUE;
for(double recording : recordings) {
System.out.println("M-"+key+"-"+recording);
if(maxPrice < recording) {
maxPrice = recording;
}
}
System.out.println(stockName+"--"+maxPrice);
stock.set(stockName);
price.set(maxPrice);
context.write(stock,price);
}
}
public static void main(String[] args) throws Exception {
final Configuration conf = new Configuration();
HBaseConfiguration.addHbaseResources(conf);
conf.set(HConstants.ZOOKEEPER_QUORUM, zkUrl);
final Job job = Job.getInstance(conf, "stock-stats-job");
// We can either specify a selectQuery or ignore it when we would like to retrieve all the columns
final String selectQuery = "SELECT STOCK_NAME,RECORDING_YEAR,RECORDINGS_QUARTER FROM STOCK ";
// StockWritable is the DBWritable class that enables us to process the Result of the above query
PhoenixMapReduceUtil.setInput(job,StockWritable.class,"STOCK",selectQuery);
// Set the target Phoenix table and the columns
PhoenixMapReduceUtil.setOutput(job, "STOCK_STATS", "STOCK_NAME,MAX_RECORDING");
job.setMapperClass(StockMapper.class);
job.setReducerClass(StockReducer.class);
job.setOutputFormatClass(PhoenixOutputFormat.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(DoubleWritable.class);
job.setOutputKeyClass(NullWritable.class);
job.setOutputValueClass(StockWritable.class);
TableMapReduceUtil.addDependencyJars(job);
job.waitForCompletion(true);
}
}
StockReducer.java
public class StockReducer extends Reducer<Text, DoubleWritable, NullWritable , StockWritable> {
protected void reduce(Text key, Iterable<DoubleWritable> recordings, Context context) throws IOException, InterruptedException {
double maxPrice = Double.MIN_VALUE;
System.out.println(recordings);
for(DoubleWritable recording : recordings) {
System.out.println("R-"+key+"-"+recording);
if(maxPrice < recording.get()) {
maxPrice = recording.get();
}
}
final StockWritable stock = new StockWritable();
stock.setStockName(key.toString());
stock.setMaxPrice(maxPrice);
System.out.println(key+"--"+maxPrice);
context.write(NullWritable.get(),stock);
}
}
StockWritable.java
public class StockWritable implements DBWritable,Writable {
private String stockName;
private int year;
private double[] recordings;
private double maxPrice;
public void readFields(DataInput input) throws IOException {
}
public void write(DataOutput output) throws IOException {
}
public void readFields(ResultSet rs) throws SQLException {
stockName = rs.getString("STOCK_NAME");
setYear(rs.getInt("RECORDING_YEAR"));
final Array recordingsArray = rs.getArray("RECORDINGS_QUARTER");
setRecordings((double[])recordingsArray.getArray());
}
public void write(PreparedStatement pstmt) throws SQLException {
pstmt.setString(1, stockName);
pstmt.setDouble(2, maxPrice);
}
public int getYear() {
return year;
}
public void setYear(int year) {
this.year = year;
}
public double[] getRecordings() {
return recordings;
}
public void setRecordings(double[] recordings) {
this.recordings = recordings;
}
public double getMaxPrice() {
return maxPrice;
}
public void setMaxPrice(double maxPrice) {
this.maxPrice = maxPrice;
}
public String getStockName() {
return stockName;
}
public void setStockName(String stockName) {
this.stockName = stockName;
}
}