0
votes

My input data will be in the below format.

   col1 col2   col3     effective date expiry date
1   Q1    A1  Value1     01/01           01/02
2   Q1    A1  Value1     01/02           01/03
3   Q1    A1  Value1     01/03           01/05
4   Q1    A1  Value2     01/05           01/06
5   Q1    A1  Value2     01/06           01/07
6   Q1    A1  Value2     01/07           01/08
7   Q1    A1  Value1     01/08           01/11
8   Q1    A1  Value1     01/11           12/31

I need to remove duplicates based on values of col1, col2, col3 but not all the duplicates. Until the value of col3 changes to different value the records are considered as duplicates. for eg. in the above data, Value 1 changes to value 2 in 4th record, so amongst records 1,2 and 3 only 1st should be retained. And amongst record 4,5 and 6 only 4th should be retained. And amongst records 7 and 8 only 7 should be retained. The last 2 columns are actually date columns (effective and expiry date). The duplicates like 1,2 and 3 could occur many times (like 1,2,3,4 and 5 could have same value) or there could be no duplicates at all.

I was having two approaches in mind, but not sure how to code for any of them.

  1. So I was thinking of generating a keychange column (1 or 0) that changes the value from 1 to 0 for all the dupes and when the key (combination of col1, col2, col3) changes, the value of this keychange column should be set to 1. Then I could filter on this column. But for this I need to write a UDF (or are there any UDF with similar functionality available?), since this requires input to be in sorted order when passing to udf, is it possible to pass sorted data to udf? if so, how? What kind of UDF should this be? or even if I write a mapreduce code, how should I proceed, should I just emit the records in mapper and do all the sorting and generating the column in reducer? Please let me know your inputs (new to mapreduce programming, so your ideas will help me a lot in learning, thanks!).

  2. When I went through the "over" function documentation, it compares only previous record and current record same column, If somehow I could compare the col5 (expiry date) of current record with col4 (effective date) of next record after sorting based on col4 (effective date) in ascending order, I could do a group by on Col1, col2 and Col3 and eliminate those record where effective date was same as previous record's expiry date. But not sure how to compare two different columns using over function. Please let me know your thoughts on this one too.

Please let me know if there is another better way to solve this. Thank you for your time!

1
What you can do is group the rows using col1, col2, col3 and then pass the each row of group to udf. In udf you do a check on whether the dates are continuous and then take a decision.Vikas Madhusudana
Hi Vikas, Thanks for the input. I was also thinking of the same approach, I am writing that UDF now. I have one question though. I will have to write multiple tuples in the output bag. for eg: 1 and 7 for C1,Q1 and Value1. I am just not sure on how to write the output schema. Currently I am using Schema bagSchema = new Schema(new Schema.FieldSchema("filtered", intuple, DataType.TUPLE));Aandal
But I am not sure how to add multiple tuples. Can you please suggest? I am referring to the link below. [link]spryinc.com/blog/guide-user-defined-functions-apache-pig[link] and also can you please let me know what these two lines are for? Schema bagSchema = new Schema(new Schema.FieldSchema("pair", tupleSchema, DataType.TUPLE)); bagSchema.setTwoLevelAccessRequired(true); Schema.FieldSchema bagFs = new Schema.FieldSchema( "pairs", bagSchema, DataType.BAG); why do we need to define another bag in the last line?Aandal

1 Answers

0
votes

Assumption - Input is a CSV file.

A = LOAD 'test.csv' using PigStorage(',');
B = GROUP A BY $0,$1,$2;
C = FOREACH B {
 D = LIMIT A 1;
 GENERATE D.$0,D.$1,D.$2,D.$3,D.$4;
}
DUMP C;

Hope this helps.