1
votes

I'm trying to pass a databag(final) as an input.

 dump final;

gives:-

(4,john,john,David,Banking ,4,M,20-01-1994,78.65,345000,Arkansasdest1,Destination)
(4,john,john,David,Banking ,4,M,20-01-1994,78.65,345000,Arkanssdest2,Destination)
(4,johns,johns,David,Banking ,4,M,20-01-1994,78.65,345000,ArkansasSrc1,source)
(4,johns,johns,David,Banking ,4,M,20-01-1994,78.65,345000,ArkansaSrc2,source)

I'm about to write an UDF for processing the above databag and finding mismatch between Source and Destination, in order to do that i have to check whether my UDF accepts databag or not. so i wrote one sample UDF below:

package PigUDFpck;

import java.io.IOException;
import java.util.Iterator;
import org.apache.pig.EvalFunc;
import org.apache.pig.data.BagFactory;
import org.apache.pig.data.DataBag;
import org.apache.pig.data.Tuple;
import org.apache.pig.data.TupleFactory;


public class databag extends EvalFunc<DataBag> {
TupleFactory mTupleFactory = TupleFactory.getInstance();
BagFactory mBagFactory = BagFactory.getInstance();

public DataBag exec(Tuple input) throws IOException { // different return type

    DataBag result = mBagFactory.newDefaultBag(); // change here
    DataBag values = (DataBag)input.get(0);
    for (Iterator<Tuple> iterator = values.iterator(); iterator.hasNext();) {
        Tuple tuple = iterator.next();

        //logic
        Tuple t = mTupleFactory.getInstance().newTuple();


        t.append(tuple);

        result.add(t);
    }
    return result; // change here
}

}

After that I registered the path using

REGISTER /usr/local/pig/UDF/UDFBAG.jar;
DEFINE Databag Databag(); // not sure how to define it 

2017-02-16 19:07:05,875 [main] WARN org.apache.pig.newplan.BaseOperatorPlan - Encountered Warning IMPLICIT_CAST_TO_INT 2 time(s). //got this warning after defining.

final1 = FOREACH final GENERATE(Databag(final));

ERROR 1200: Pig script failed to parse: Invalid scalar projection: final : A column needs to be projected from a relation for it to be used as a scalar

Please help me on Defining the UDF and how to pass a DataBag to UDF

Thanks

1
Your code looks good not sure why you are getting warning and error. Can you try to do one change in your for loop, instead of using iterator, use for(Tuple tuple : values){ //you code } . Also why you are creating new tuple for each row?Rajen Raiyarela
Hi Rajen, Could you please tell me the format of how to Define the UDF.Vickyster

1 Answers

1
votes

Try

final1 = FOREACH final GENERATE(Databag(*));

Though as far as I see, your final contains tuples, not bags of tuples, so you'll probably need to first group it by some key. in that case it will be smth like

final1 = FOREACH (group final [by key or all]) GENERATE(Databag(final));