I am trying to calculate the difference between records by group and also include row number by group. This could be done using lag and row number functions in HIVE using windowing functions. Trying to recreate this using PIG and python UDFs.
In the following example, I need the row number to restart from 1 for each name and increment for a new month (new record). Also, I need the difference in balance from prior month for each name.
input data
name month balance
A 1 10
A 2 5
A 3 15
B 2 20
B 3 10
B 4 45
B 5 50
output data
name month balance row_number balance_diff
A 1 10 1 0
A 2 5 1 -5
A 3 15 3 10
B 2 20 1 0
B 3 10 2 -10
B 4 45 3 35
B 5 50 4 5
How can I do this using PIG and python UDF? Below is what I tried.
PIG
output = foreach (group input by (name)) {
sorted = order input BY month asc;
row_details= myudf.rownum_and_diff(sorted.(month, balance));
generate flatten (sorted), flatten (row_details));
};
Python UDF
def row_num(mth):
return [x+1 for x,y in enumerate (mth)]
def diff(bal, n=1):
return [x-y if (x is not None and y is not None) else 0.0 \
for x,y in zip(bal, [:n] + bal)]
@outputSchema('udfbag:bag{udftuple:tuple(row_number: int, balance_diff: int)}')
def row_metrics(mthbal):
mth, bal = zip(*mthbal)
row_number = row_num(mth)
balance_diff = diff(bal)
return zip(row_number, balance_diff)
My python functions work. However, I am having trouble combining the two bags (sorted and row_detail) once I bring the results into PIG. Any help is much appreciated.
I have also seen the enumerate function in PIG doing what I want with the row number. As part of learning PIG, however, I am looking for a solution using python UDFs.
[:n]in python code andrownum_and_diffshould berow_metrics- Dhanesh