I have requirement to read data from a database and analyse the data using pig. I have written a UDF in java Referring following link
register /tmp/UDFJars/CassandraUDF_1-0.0.1-SNAPSHOT-jar-with-dependencies.jar;
A = Load '/user/sampleFile.txt' using udf.DBLoader('10.xx.xxx.4','username','password','select * from customer limit 10') as (f1 : chararray);
DUMP A;
package udf;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.mapreduce.InputFormat;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.pig.LoadFunc;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigSplit;
import org.apache.pig.data.Tuple;
import org.apache.pig.data.TupleFactory;
import com.data.ConnectionCassandra;
import com.datastax.driver.core.ResultSet;
import com.datastax.driver.core.Row;
import com.datastax.driver.core.Session;
public class DBLoader extends LoadFunc {
private final Log log = LogFactory.getLog(getClass());
Session session;
private ArrayList mProtoTuple = null;
private String jdbcURL;
private String user;
private String pass;
private int count = 0;
private String query;
ResultSet result;
List<Row> rows;
int colSize;
protected TupleFactory mTupleFactory = TupleFactory.getInstance();
public DBLoader() {
}
public DBLoader(String jdbcURL, String user, String pass, String query) {
this.jdbcURL = jdbcURL;
this.user = user;
this.pass = pass;
this.query = query;
}
@Override
public InputFormat getInputFormat() throws IOException {
log.info("Inside InputFormat");
// TODO Auto-generated method stub
try {
return new TextInputFormat();
} catch (Exception exception) {
log.error(exception.getMessage());
log.error(exception.fillInStackTrace());
throw new IOException();
}
}
@Override
public Tuple getNext() throws IOException {
log.info("Inside get Next");
Row row = rows.get(count);
if (row != null) {
mProtoTuple = new ArrayList<Object>();
for (int colNum = 0; colNum < colSize; colNum++) {
mProtoTuple.add(row.getObject(colNum));
}
} else {
return null;
}
Tuple t = mTupleFactory.newTuple(mProtoTuple);
mProtoTuple.clear();
return t;
}
@Override
public void prepareToRead(RecordReader arg0, PigSplit arg1) throws IOException {
log.info("Inside Prepare to Read");
session = null;
if (query == null) {
throw new IOException("SQL Insert command not specified");
}
if (user == null || pass == null) {
log.info("Creating Session with user name and password as: " + user + " : " + pass);
session = ConnectionCassandra.connectToCassandra1(jdbcURL, user, pass);
log.info("Session Created");
} else {
session = ConnectionCassandra.connectToCassandra1(jdbcURL, user, pass);
}
log.info("Executing Query " + query);
result = session.execute(query);
log.info("Query Executed :" + query);
rows = result.all();
count = 0;
colSize = result.getColumnDefinitions().asList().size();
}
@Override
public void setLocation(String location, Job job) throws IOException {
log.info("Inside Set Location");
try {
FileInputFormat.setInputPaths(job, location);
} catch (Exception exception) {
log.info("Some thing went wrong : " + exception.getMessage());
log.debug(exception);
}
}
}
Above is my pig script and java code. Here /user/sampleFile.txt is a dummy file with no data. I am getting following exception:
Pig Stack Trace
ERROR 1066: Unable to open iterator for alias A
org.apache.pig.impl.logicalLayer.FrontendException: ERROR 1066: Unable to open iterator for alias A at org.apache.pig.PigServer.openIterator(PigServer.java:892) at org.apache.pig.tools.grunt.GruntParser.processDump(GruntParser.java:774) at org.apache.pig.tools.pigscript.parser.PigScriptParser.parse(PigScriptParser.java:372) at org.apache.pig.tools.grunt.GruntParser.parseStopOnError(GruntParser.java:198) at org.apache.pig.tools.grunt.GruntParser.parseStopOnError(GruntParser.java:173) at org.apache.pig.tools.grunt.Grunt.exec(Grunt.java:84) at org.apache.pig.Main.run(Main.java:484) at org.apache.pig.Main.main(Main.java:158) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at org.apache.hadoop.util.RunJar.run(RunJar.java:221) at org.apache.hadoop.util.RunJar.main(RunJar.java:136) Caused by: java.io.IOException: Job terminated with anomalous status FAILED at org.apache.pig.PigServer.openIterator(PigServer.java:884)