2
votes

I am trying to output to a HBase table directly from my Mapper while using Hadoop2.4.0 with HBase0.94.18 on EMR.

I am getting a nasty IOException: Pass a Delete or a Put when executing the code below.

public class TestHBase {
  static class ImportMapper 
            extends Mapper<MyKey, MyValue, ImmutableBytesWritable, Writable> {
    private byte[] family = Bytes.toBytes("f");

    @Override
    public void map(MyKey key, MyValue value, Context context) {
      MyItem item = //do some stuff with key/value and create item
      byte[] rowKey = Bytes.toBytes(item.getKey());
      Put put = new Put(rowKey);
      for (String attr : Arrays.asList("a1", "a2", "a3")) {
        byte[] qualifier = Bytes.toBytes(attr);
        put.add(family, qualifier, Bytes.toBytes(item.get(attr)));
      }
      context.write(new ImmutableBytesWritable(rowKey), put);
    }
  }

  public static void main(String[] args) throws Exception {
    Configuration conf = HBaseConfiguration.create();
    String input = args[0];
    String table = "table";
    Job job = Job.getInstance(conf, "stuff");

    job.setJarByClass(ImportMapper.class);
    job.setInputFormatClass(SequenceFileInputFormat.class);
    FileInputFormat.setInputDirRecursive(job, true);
    FileInputFormat.addInputPath(job, new Path(input));

    TableMapReduceUtil.initTableReducerJob(
            table,                  // output table
            null,                   // reducer class
            job);
    job.setNumReduceTasks(0);
    System.exit(job.waitForCompletion(true) ? 0 : 1);
  }
}

Does anyone know what I am doing wrong?

Stacktrace

Error: java.io.IOException: Pass a Delete or a Put at org.apache.hadoop.hbase.mapreduce.TableOutputFormat$TableRecordWriter.write(TableOutputFormat.java:125) at org.apache.hadoop.hbase.mapreduce.TableOutputFormat$TableRecordWriter.write(TableOutputFormat.java:84) at org.apache.hadoop.mapred.MapTask$NewDirectOutputCollector.write(MapTask.java:646) at org.apache.hadoop.mapreduce.task.TaskInputOutputContextImpl.write(TaskInputOutputContextImpl.java:89) at org.apache.hadoop.mapreduce.lib.map.WrappedMapper$Context.write(WrappedMapper.java:112) at org.apache.hadoop.mapreduce.Mapper.map(Mapper.java:124) at org.apache.hadoop.mapreduce.Mapper.run(Mapper.java:145) at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:775) at org.apache.hadoop.mapred.MapTask.run(MapTask.java:341) at org.apache.hadoop.mapred.YarnChild$2.run(YarnChild.java:167) at java.security.AccessController.doPrivileged(Native Method) at javax.security.auth.Subject.doAs(Subject.java:415) at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1548) at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:162) Container killed by the ApplicationMaster. Container killed on request. Exit code is 143 Container exited with a non-zero exit code 143

2
Let's see the stack tracecandied_orange
In your provided code: context.write(new ImmutableBytesWritable(rowKey), put); its outside of the map method. Please fix it first because it doesn't match what the traceback is showing...Rubén Moraleda
Thanks for pointing that out Ruben, it was a copy/paste mistakeMarsellus Wallace

2 Answers

2
votes

It would be better if you can show the full stack trace, so that i can help you solve it easily. I've not executed your code. As far as i've seen your code, this could be the issue
job.setNumReduceTasks(0);

Mapper will be expecting your put object to write directly to Apache HBase. You can increase the setNumReduceTasks OR If you see the API you can find its default value and comment it.

0
votes

Thanks for adding the stack trace. Unfortunately you didn't include the code that threw the exception so I can't fully trace it for you. Instead I did a little searching around and discovered a few things for you.

Your stack trace is similar to one in another SO question here: Pass a Delete or a Put error in hbase mapreduce

That one solved the issue by commenting out job.setNumReduceTasks(0);

There is a similar SO question that had the same exception but couldn't solve the problem that way. Instead it was having a problem with annotations:

"java.io.IOException: Pass a Delete or a Put" when reading HDFS and storing HBase


Here are some good examples of how to write working code both with setNumReduceTasks at 0 and at 1 or more.

"51.2. HBase MapReduce Read/Write Example The following is an example of using HBase both as a source and as a sink with MapReduce. This example will simply copy data from one table to another.

Configuration config = HBaseConfiguration.create();
Job job = new Job(config,"ExampleReadWrite");
job.setJarByClass(MyReadWriteJob.class);    // class that contains mapper

Scan scan = new Scan();
scan.setCaching(500);        // 1 is the default in Scan, which will be bad for MapReduce jobs
scan.setCacheBlocks(false);  // don't set to true for MR jobs
// set other scan attrs

TableMapReduceUtil.initTableMapperJob(
  sourceTable,      // input table
  scan,             // Scan instance to control CF and attribute selection
  MyMapper.class,   // mapper class
  null,             // mapper output key
  null,             // mapper output value
  job);
TableMapReduceUtil.initTableReducerJob(
  targetTable,      // output table
  null,             // reducer class
  job);
job.setNumReduceTasks(0);

boolean b = job.waitForCompletion(true);
if (!b) {
    throw new IOException("error with job!");
}

This is the 1 or more example:

"51.4. HBase MapReduce Summary to HBase Example The following example uses HBase as a MapReduce source and sink with a summarization step. This example will count the number of distinct instances of a value in a table and write those summarized counts in another table.

Configuration config = HBaseConfiguration.create();
Job job = new Job(config,"ExampleSummary");
job.setJarByClass(MySummaryJob.class);     // class that contains mapper and reducer

Scan scan = new Scan();
scan.setCaching(500);        // 1 is the default in Scan, which will be bad for MapReduce jobs
scan.setCacheBlocks(false);  // don't set to true for MR jobs
// set other scan attrs

TableMapReduceUtil.initTableMapperJob(
  sourceTable,        // input table
  scan,               // Scan instance to control CF and attribute selection
  MyMapper.class,     // mapper class
  Text.class,         // mapper output key
  IntWritable.class,  // mapper output value
  job);
TableMapReduceUtil.initTableReducerJob(
  targetTable,        // output table
  MyTableReducer.class,    // reducer class
  job);
job.setNumReduceTasks(1);   // at least one, adjust as required

boolean b = job.waitForCompletion(true);
if (!b) {
  throw new IOException("error with job!");
}

http://hbase.apache.org/book.html#mapreduce.example

You seem to be more closely following the first example. I wanted to show that sometimes there is a reason to set the number of reduce tasks to zero.