1
votes

I have requirement to read data from a database and analyse the data using pig. I have written a UDF in java Referring following link

register /tmp/UDFJars/CassandraUDF_1-0.0.1-SNAPSHOT-jar-with-dependencies.jar;
A = Load '/user/sampleFile.txt' using udf.DBLoader('10.xx.xxx.4','username','password','select * from customer limit 10') as (f1 : chararray);
DUMP A;


package udf;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.mapreduce.InputFormat;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.pig.LoadFunc;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigSplit;
import org.apache.pig.data.Tuple;
import org.apache.pig.data.TupleFactory;

import com.data.ConnectionCassandra;
import com.datastax.driver.core.ResultSet;
import com.datastax.driver.core.Row;
import com.datastax.driver.core.Session;

public class DBLoader extends LoadFunc {
    private final Log log = LogFactory.getLog(getClass());
    Session session;
    private ArrayList mProtoTuple = null;
    private String jdbcURL;
    private String user;
    private String pass;
    private int count = 0;
    private String query;
    ResultSet result;
    List<Row> rows;
    int colSize;
    protected TupleFactory mTupleFactory = TupleFactory.getInstance();

    public DBLoader() {
    }

    public DBLoader(String jdbcURL, String user, String pass, String query) {

        this.jdbcURL = jdbcURL;
        this.user = user;
        this.pass = pass;
        this.query = query;

    }

    @Override
    public InputFormat getInputFormat() throws IOException {
        log.info("Inside InputFormat");
        // TODO Auto-generated method stub
        try {
            return new TextInputFormat();
        } catch (Exception exception) {
            log.error(exception.getMessage());
            log.error(exception.fillInStackTrace());
            throw new IOException();
        }
    }

    @Override
    public Tuple getNext() throws IOException {
        log.info("Inside get Next");
        Row row = rows.get(count);
        if (row != null) {
            mProtoTuple = new ArrayList<Object>();
            for (int colNum = 0; colNum < colSize; colNum++) {
                mProtoTuple.add(row.getObject(colNum));
            }
        } else {
            return null;
        }
        Tuple t = mTupleFactory.newTuple(mProtoTuple);
        mProtoTuple.clear();
        return t;

    }

    @Override
    public void prepareToRead(RecordReader arg0, PigSplit arg1) throws IOException {
        log.info("Inside Prepare to Read");
        session = null;
        if (query == null) {
            throw new IOException("SQL Insert command not specified");
        }
        if (user == null || pass == null) {
            log.info("Creating Session with user name and password as: " + user + " : " + pass);
            session = ConnectionCassandra.connectToCassandra1(jdbcURL, user, pass);
            log.info("Session Created");
        } else {
            session = ConnectionCassandra.connectToCassandra1(jdbcURL, user, pass);
        }
        log.info("Executing Query " + query);
        result = session.execute(query);
        log.info("Query Executed :" + query);
        rows = result.all();
        count = 0;
        colSize = result.getColumnDefinitions().asList().size();
    }

    @Override
    public void setLocation(String location, Job job) throws IOException {
        log.info("Inside Set Location");
        try {
            FileInputFormat.setInputPaths(job, location);
        } catch (Exception exception) {
            log.info("Some thing went wrong : " + exception.getMessage());
            log.debug(exception);
        }

    }
}

Above is my pig script and java code. Here /user/sampleFile.txt is a dummy file with no data. I am getting following exception:

Pig Stack Trace

ERROR 1066: Unable to open iterator for alias A

org.apache.pig.impl.logicalLayer.FrontendException: ERROR 1066: Unable to open iterator for alias A at org.apache.pig.PigServer.openIterator(PigServer.java:892) at org.apache.pig.tools.grunt.GruntParser.processDump(GruntParser.java:774) at org.apache.pig.tools.pigscript.parser.PigScriptParser.parse(PigScriptParser.java:372) at org.apache.pig.tools.grunt.GruntParser.parseStopOnError(GruntParser.java:198) at org.apache.pig.tools.grunt.GruntParser.parseStopOnError(GruntParser.java:173) at org.apache.pig.tools.grunt.Grunt.exec(Grunt.java:84) at org.apache.pig.Main.run(Main.java:484) at org.apache.pig.Main.main(Main.java:158) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at org.apache.hadoop.util.RunJar.run(RunJar.java:221) at org.apache.hadoop.util.RunJar.main(RunJar.java:136) Caused by: java.io.IOException: Job terminated with anomalous status FAILED at org.apache.pig.PigServer.openIterator(PigServer.java:884)

... 13 more

1
Where is the pig code? From the error it looks your pig statement isn't correct.San
@ANI register /tmp/UDFJars/CassandraUDF_1-0.0.1-SNAPSHOT-jar-with-dependencies.jar; A = Load '/user/sampleFile.txt' using udf.DBLoader('10.xx.xxx.4','username','password','select * from customer limit 10') as (f1 : chararray); DUMP A; is my simple pig script.Vivek Basidoni
select * from customer limit 10 is stored in one variable f1:chararray ! Does the table have one column only ?San
No, but I also tried with changing query to select customer_id from customer limit 10 still getting same error also I am curious to know whether this method of fetching data is correct or not.Vivek Basidoni
Another way would be to use Sqoop to read data into Hive, then use HCatalgo to read data into Pig. These tools are already there and no point in reinventing the wheel. If you must create a UDF, then just embed Sqoop import command in your UDF.Nazar Merza

1 Answers

0
votes

Vivek! Do you even get in prepareToRead? (I see you did some logging, so it would be nice to know what you actually have in log) Also it would be really great to provide full stacktrace as I see you don't have full underlying exception. Just some thoughts - I never tried writing a LoadFunc without implementing my own InputFormat and RecordReader - TextInputFormat checks for file existence and it's size (and creates a number of InputSplits based on file size(s)), so if your dummy file is empty there is a big possibility that no InputSplits are produced or zero-length InputSplit is produced. As it has zero-length it may cause pig to throw that exception. So the good suggestion is to implement own InputFormat (it's actually pretty easy). Also just as a fast try - try

set pig.splitCombination false

Probably it won't help, but it's easy to try.