
I am attempting to write a pig script that will allow me to load Json (taken from elastic search and dumped onto hdfs).

I have been struggling for a number of days with this perhaps someone can give me some insight to the issues i am experiencing.

This is a quick pig script i wrote to read from hbase modify the data arbitrarily and then store back into hbase (just to make sure that everything works)

REGISTER hbase-common-1.1.1.jar
REGISTER /tmp/udfs/json-simple-1.1.1.jar
REGISTER /tmp/udfs/elephant-bird-hadoop-compat-4.9.jar
REGISTER /tmp/udfs/elephant-bird-pig-4.9.jar
REGISTER /user/hdfs/share/libs/guava-11.0.jar
REGISTER /user/hdfs/share/libs/zookeeper-

set hbase.zookeeper.quorum 'list of servers';    

raw = LOAD 'hbase://esimporttest' USING org.apache.pig.backend.hadoop.hbase.HBaseStorage('esinfo:a', '-loadKey true -limit 5') AS (id:bytearray, a:chararray);
keys = FOREACH raw GENERATE id, CONCAT(a, '1');

keys = LIMIT keys 1;

STORE keys INTO 'hbase://esimporttest' USING org.apache.pig.backend.hadoop.hbase.HBaseStorage('esinfo:id esinfo:a');    

The result of running this script is that data is read from and stored back into hbase and it runs perfectly.

I then attempted to modify the script to now load the data from a json file rather than from Hbase.

REGISTER hbase-common-1.1.1.jar
REGISTER /tmp/udfs/json-simple-1.1.1.jar
REGISTER /tmp/udfs/elephant-bird-hadoop-compat-4.9.jar
REGISTER /tmp/udfs/elephant-bird-pig-4.9.jar
REGISTER /user/hdfs/share/libs/guava-11.0.jar
REGISTER /user/hdfs/share/libs/zookeeper-

set hbase.zookeeper.quorum 'list of servers';

raw_data = LOAD '/user/hdfs/input/EsImports/2014-04-22.json' using com.twitter.elephantbird.pig.load.JsonLoader('-nestedLoad') as (json:map[]); 
keys = FOREACH raw_data GENERATE
    json#'sid' as id:bytearray,
    json#'userAgent' as a:chararray;

limit_keys = LIMIT keys 1;

STORE limit_keys INTO 'hbase://esimporttest' USING org.apache.pig.backend.hadoop.hbase.HBaseStorage('esinfo:id esinfo:a');

This is the script that fails, i have a feeling it has something to do with the schema of the data that is being loaded but when i perform a describe and dump on the data it all seems to have the exact same structure

Furthermore the error i get when the script fails is as follows

ERROR 2244: Job job_1439978375936_0215 failed, hadoop does not return any error message

Full error log

Log Type: syslog
Log Upload Time: Mon Aug 24 13:28:43 +0200 2015
Log Length: 4121
2015-08-24 13:28:35,504 INFO [main] org.apache.hadoop.mapreduce.v2.app.MRAppMaster: Created MRAppMaster for application appattempt_1439978375936_0238_000001
2015-08-24 13:28:35,910 WARN [main] org.apache.hadoop.util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
2015-08-24 13:28:35,921 INFO [main] org.apache.hadoop.mapreduce.v2.app.MRAppMaster: Executing with tokens:
2015-08-24 13:28:35,921 INFO [main] org.apache.hadoop.mapreduce.v2.app.MRAppMaster: Kind: YARN_AM_RM_TOKEN, Service: , Ident: (appAttemptId { application_id { id: 238 cluster_timestamp: 1439978375936 } attemptId: 1 } keyId: 176959833)
2015-08-24 13:28:36,056 INFO [main] org.apache.hadoop.mapreduce.v2.app.MRAppMaster: Kind: mapreduce.job, Service: job_1439978375936_0236, Ident: (org.apache.hadoop.mapreduce.security.token.JobTokenIdentifier@331fef77)
2015-08-24 13:28:36,057 INFO [main] org.apache.hadoop.mapreduce.v2.app.MRAppMaster: Kind: RM_DELEGATION_TOKEN, Service: {ip removed}, Ident: (owner=darryn, renewer=mr token, realUser=hcat, issueDate=1440415651774, maxDate=1441020451774, sequenceNumber=176, masterKeyId=149)
2015-08-24 13:28:36,070 INFO [main] org.apache.hadoop.mapreduce.v2.app.MRAppMaster: Using mapred newApiCommitter.
2015-08-24 13:28:36,699 WARN [main] org.apache.hadoop.hdfs.shortcircuit.DomainSocketFactory: The short-circuit local reads feature cannot be used because libhadoop cannot be loaded.
2015-08-24 13:28:36,804 INFO [main] org.apache.hadoop.mapreduce.v2.app.MRAppMaster: OutputCommitter set in config null
2015-08-24 13:28:36,950 FATAL [main] org.apache.hadoop.mapreduce.v2.app.MRAppMaster: Error starting MRAppMaster
java.lang.NoClassDefFoundError: org/apache/hadoop/hbase/mapreduce/TableInputFormat
    at java.lang.Class.forName0(Native Method)
    at java.lang.Class.forName(Class.java:270)
    at org.apache.pig.impl.PigContext.resolveClassName(PigContext.java:657)
    at org.apache.pig.impl.PigContext.instantiateFuncFromSpec(PigContext.java:726)
    at org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore.getStoreFunc(POStore.java:251)
    at org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigOutputCommitter.getCommitters(PigOutputCommitter.java:88)
    at org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigOutputCommitter.<init>(PigOutputCommitter.java:71)
    at org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigOutputFormat.getOutputCommitter(PigOutputFormat.java:289)
    at org.apache.hadoop.mapreduce.v2.app.MRAppMaster$1.call(MRAppMaster.java:470)
    at org.apache.hadoop.mapreduce.v2.app.MRAppMaster$1.call(MRAppMaster.java:452)
    at org.apache.hadoop.mapreduce.v2.app.MRAppMaster.callWithJobClassLoader(MRAppMaster.java:1541)
    at org.apache.hadoop.mapreduce.v2.app.MRAppMaster.createOutputCommitter(MRAppMaster.java:452)
    at org.apache.hadoop.mapreduce.v2.app.MRAppMaster.serviceInit(MRAppMaster.java:371)
    at org.apache.hadoop.service.AbstractService.init(AbstractService.java:163)
    at org.apache.hadoop.mapreduce.v2.app.MRAppMaster$4.run(MRAppMaster.java:1499)
    at java.security.AccessController.doPrivileged(Native Method)
    at javax.security.auth.Subject.doAs(Subject.java:415)
    at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1628)
    at org.apache.hadoop.mapreduce.v2.app.MRAppMaster.initAndStartAppMaster(MRAppMaster.java:1496)
    at org.apache.hadoop.mapreduce.v2.app.MRAppMaster.main(MRAppMaster.java:1429)
Caused by: java.lang.ClassNotFoundException: org.apache.hadoop.hbase.mapreduce.TableInputFormat
    at java.net.URLClassLoader$1.run(URLClassLoader.java:366)
    at java.net.URLClassLoader$1.run(URLClassLoader.java:355)
    at java.security.AccessController.doPrivileged(Native Method)
    at java.net.URLClassLoader.findClass(URLClassLoader.java:354)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:425)
    at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:358)
    ... 20 more
2015-08-24 13:28:36,954 INFO [main] org.apache.hadoop.util.ExitUtil: Exiting with status 1


So i have noticed some interesting behavior if i save the data stored into the ailias using PigStorage and specify the -schema option and then in a separate script load this file back up (with PigStorage still) i can then directly insert into HBase, this leads me to suspect it has something to do with how the schema is being stored


1 Answers


So the solution i used in the end is by no means optimal but works just fine.

After reading in your data from the json files and generating your schema what you want to do is to save it back to a file using pig storage and then read that file back in.

fs -rm -r /tmp/estest2
Store test into '/tmp/estest2' USING PigStorage('\t', '-schema');

processed_data = LOAD '/tmp/estest2' USING PigStorage('\t'); 

EXEC; //Used to sync the script and allow it to finish up until this point

What i suspect is happening is the type elephant bird JsonLoader uses is miss interpreted by HbaseStorage but it does understand the PigStorage type and thus allows it to load the data into hbase.

There were a couple other things i found while doing this. you need an 'id' field in you alias of data but must not specify this in the param list you pass through to hbase.

A simplified working script using this soulution would look like this

REGISTER hbase-common-1.1.1.jar
REGISTER /tmp/udfs/json-simple-1.1.1.jar
REGISTER /tmp/udfs/elephant-bird-hadoop-compat-4.9.jar
REGISTER /tmp/udfs/elephant-bird-pig-4.9.jar
REGISTER /user/hdfs/share/libs/guava-11.0.jar
REGISTER /user/hdfs/share/libs/zookeeper-

set hbase.zookeeper.quorum 'list of servers';

raw_data = LOAD '/user/hdfs/input/EsImports/2014-04-22.json' using com.twitter.elephantbird.pig.load.JsonLoader('-nestedLoad') as (json:map[]); 
keys = FOREACH raw_data GENERATE
    json#'sid' as id:bytearray, //ID field will not be included in HBaseStorage function call but will be used
    json#'userAgent' as a:chararray;

limit_keys = LIMIT keys 1;

//This is super hacky but works
fs -rm -r /tmp/estest2 //fails if the directory does not exist
Store limit_keys into '/tmp/estest2' USING PigStorage('\t', '-schema');
processed_data = LOAD '/tmp/estest2' USING PigStorage('\t'); 

EXEC; //Used to sync the script and allow it to finish up until this point before starting to insert to hbase

STORE processed_data INTO 'hbase://esimporttest' USING org.apache.pig.backend.hadoop.hbase.HBaseStorage('esinfo:a');