I use HBase 1.2. I would like to run a MapReduce job on HBase using multiple scans. In the API, there is :
TableMapReduceUtil.initTableMapperJob(List<Scan> scans, Class<? extends TableMapper> mapper, Class<?> outputKeyClass, Class<?> outputValueClass, org.apache.hadoop.mapreduce.Job job)
.
But how to specify the table of each scan ? I use the code below :
List<Scan> scans = new ArrayList<>();
for (String firstPart : firstParts) {
Scan scan = new Scan();
scan.setRowPrefixFilter(Bytes.toBytes(firstPart));
scan.setCaching(500);
scan.setCacheBlocks(false);
scans.add(scan);
}
TableMapReduceUtil.initTableMapperJob(scans, MyMapper.class, Text.class, Text.class, job);
It gives the following Exception
Exception in thread "main" java.lang.NullPointerException
at org.apache.hadoop.hbase.TableName.valueOf(TableName.java:436)
at org.apache.hadoop.hbase.mapreduce.TableInputFormat.initialize(TableInputFormat.java:184)
at org.apache.hadoop.hbase.mapreduce.TableInputFormatBase.getSplits(TableInputFormatBase.java:241)
at org.apache.hadoop.hbase.mapreduce.TableInputFormat.getSplits(TableInputFormat.java:240)
at org.apache.hadoop.mapreduce.lib.input.DelegatingInputFormat.getSplits(DelegatingInputFormat.java:115)
at org.apache.hadoop.mapreduce.JobSubmitter.writeNewSplits(JobSubmitter.java:305)
at org.apache.hadoop.mapreduce.JobSubmitter.writeSplits(JobSubmitter.java:322)
at org.apache.hadoop.mapreduce.JobSubmitter.submitJobInternal(JobSubmitter.java:200)
at org.apache.hadoop.mapreduce.Job$10.run(Job.java:1307)
at org.apache.hadoop.mapreduce.Job$10.run(Job.java:1304)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1714)
at org.apache.hadoop.mapreduce.Job.submit(Job.java:1304)
at org.apache.hadoop.mapreduce.Job.waitForCompletion(Job.java:1325)
I think it's normal since the tables on which each scan should be applied are not specified anywhere.
But how to do it ?
I tried to add
scan.setAttribute("scan.attributes.table.name", Bytes.toBytes("my_table"));
but it gives the same error