I have some records that contain some bags as fields and I'm trying to merge the bags for records that have otherwise identical fields (I am discarding some fields).
The data looks something like:
u08 u08an id {(web)} 0 0 {(GB),(US)} an
u08 u08an id {(ars)} 0 0 {(GB),(RU)} an
u09 u09an id {(web)} 0 0 {(GB)} an
u09 u09an id {(web)} 0 0 {(US)} an
u10 u10an id {(web)} 0 0 {(GB)} an
u10 u10an id {(ars)} 0 0 {(GB)} an
u11 u11an id {(web)} 0 0 {(GB)} an
u11 u11an id {(web)} 0 0 {(GB)} an
and I would like to obtain (after discarding irrelevant fields and shuffling) something like:
u08 u08an an {(GB),(US),(RU)}
u09 u09an an {(GB),(US)}
u10 u10an an {(GB)}
u11 u11an an {(GB)}
The input is loaded using the following schema:
user_identities = LOAD '$partner_user_identities_location' AS (
user_id: chararray,
partner_user_id: chararray,
partner_user_id_type: chararray,
sync_types: bag{tuple(chararray)},
synced_first_timestamp: double,
synced_last_timestamp: double,
country_codes: bag{tuple(chararray)},
partner_id: chararray
);
If I'm merging both sync_types and country_codes, everything works as expected, but if I only generate country_codes, the records are not sorted before applying DISTINCT, so non-adjacent duplicates remain in the output.
Running the following snippet (in local mode):
user_identities = GROUP user_identities BY (user_id, partner_user_id, partner_id);
user_identities = FOREACH user_identities {
sync_types = FOREACH user_identities GENERATE flatten(sync_types);
sync_types = DISTINCT sync_types;
country_codes = FOREACH user_identities GENERATE flatten(country_codes);
country_codes = DISTINCT country_codes;
GENERATE flatten(group) AS (user_id, partner_user_id, partner_id), sync_types, country_codes;
}
DUMP user_identities;
Outputs:
(u08,u08an,an,{(ars),(web)},{(GB),(RU),(US)})
(u09,u09an,an,{(web)},{(GB),(US)})
(u10,u10an,an,{(ars),(web)},{(GB)})
(u11,u11an,an,{(web)},{(GB)})
However, if I change the inner GENERATE statement to GENERATE flatten(group) AS (user_id, partner_user_id, partner_id), country_codes;, omitting sync_types, I get the following output (note the duplicate (GB) in the first record):
(u08,u08an,an,{(GB),(RU),(GB),(US)})
(u09,u09an,an,{(GB),(US)})
(u10,u10an,an,{(GB)})
(u11,u11an,an,{(GB)})
As I'm not using sync_types in my script, I see no reason why I should generate it except as a workaround for this issue.
Is this a known bug in Pig (or in the local mode of Pig)? Or am I not merging the bags correctly?