0
votes

I am trying to calculate the difference between records by group and also include row number by group. This could be done using lag and row number functions in HIVE using windowing functions. Trying to recreate this using PIG and python UDFs.

In the following example, I need the row number to restart from 1 for each name and increment for a new month (new record). Also, I need the difference in balance from prior month for each name.

input data

name    month   balance
A   1   10
A   2   5
A   3   15
B   2   20
B   3   10
B   4   45
B   5   50

output data

name    month   balance row_number  balance_diff
A   1   10  1   0
A   2   5   1   -5
A   3   15  3   10
B   2   20  1   0
B   3   10  2   -10
B   4   45  3   35
B   5   50  4   5

How can I do this using PIG and python UDF? Below is what I tried.

PIG

output = foreach (group input by (name)) {
    sorted = order input BY month asc;
    row_details= myudf.rownum_and_diff(sorted.(month, balance));
    generate flatten (sorted), flatten (row_details));
    };

Python UDF

def row_num(mth):
    return [x+1 for x,y in enumerate (mth)]

def diff(bal, n=1):
    return [x-y if (x is not None and y is not None) else 0.0 \
        for x,y in zip(bal, [:n] + bal)]

@outputSchema('udfbag:bag{udftuple:tuple(row_number: int, balance_diff: int)}')

def row_metrics(mthbal):
    mth, bal = zip(*mthbal)
    row_number = row_num(mth)
    balance_diff = diff(bal)
    return zip(row_number, balance_diff)

My python functions work. However, I am having trouble combining the two bags (sorted and row_detail) once I bring the results into PIG. Any help is much appreciated.

I have also seen the enumerate function in PIG doing what I want with the row number. As part of learning PIG, however, I am looking for a solution using python UDFs.

2
There seems to be bugs in this code. There is an error in [:n] in python code and rownum_and_diff should be row_metrics - Dhanesh
Hi @Dhanseh, Thank you for catching the error on myudf. row_num_and_diff-- forgot to rename it. My original code for diff function works fine. I am trying to use the same function even when I have to calculate balance difference against the balance two or three months ago. My original function returns the same number elements regardless of value of n (which is what I want). However, I noticed that your code returns less elements when n>1. Thank you again for your help. - nikeshpraj

2 Answers

0
votes

Try this.

Python UDF:

def row_num(mth):
    return [x+1 for x,y in enumerate (mth)]

def diff(bal, n=1):
    return [0]+[x-y for x,y in zip(bal[n:],bal[:-n])]


@outputSchema('udfbag:bag{udftuple:tuple(name: chararray, mth: int, row_number: int, balance_diff: int)}')

def row_metrics(mthbal):
    name, mth, bal = zip(*mthbal)
    row_number = row_num(mth)
    balance_diff = diff(bal)
    return zip(name,mth,row_number, balance_diff)

Pig Script:

register 'myudf.py' using jython as myudf;
inpdat = load 'input.dat' using PigStorage(',') as (name:chararray, month:int, balance:int);

outdat = foreach (group inpdat by name) {
    sorted = order inpdat BY month asc;
    row_details = myudf.row_metrics(sorted);
    generate flatten (row_details);
    };

dump outdat;
0
votes

Using the stitch function from piggybank worked in my case. Would be interested to learn any other ways to do this.

REGISTER /mypath/piggybank.jar;
define Stitch org.apache.pig.piggybank.evaluation.Stitch;

input = load 'input.dat' using PigStorage(',') as (name:chararray, month:int, balance:int);

output = FOREACH (group input by name) { 
sorted = ORDER input by month asc; 
udf_fields = myudf.row_metrics(sorted.(month, balance));
generate flatten(Stitch(sorted,udf_fields)) as (name, month, balance, row_number, balance_diff);
};