2
votes

I have a Pig script with a Python UDF that is supposed to generate user level features. My data is preprocessed by Pig and then sent to an UDF as a list of tuples. The UDF will process the tuples of data and return a chararray with my features computer per user. The code where this happens looks like this below:

-- ... loading data above
data = FOREACH data_raw GENERATE user_id, ...; -- some other metrics as well
-- Group by ids
grouped_ids = GROUP data BY user_id PARALLEL 20;
-- Limit the ids to process
userids = LIMIT grouped_ids (long)'$limit';
-- Generate features
user_features = FOREACH userids {
  GENERATE group as user_id:chararray,
  udfs.extract_features(data) as features:chararray;
}

The UDF code clearly runs in the reducer, and for some reason it always goes to one reducer and it takes quite some time. I am searching for a way to parallelize the execution of it, as now my job takes 22 minutes in total of which 18 mins are in this single reducer.

Pig tries to allocate 1GB of data to a reducer typically, and my data is indeed less than 1GB, around 300-700MB, but pretty time consuming on the UDF end, so this is clearly not optimal, while the rest of my cluster is empty.

Things I have tried:

  • Setting default parallel impacts the whole script script, but still does not manage to get the reducer with the UDF to parallelize
  • Manually setting parallel on GROUP data BY user_id parallelizes the output of the group and invokes multiple reducers, but at the point where the UDF kicks in, it's again a single reducer
  • Setting pig.exec.reducers.bytes.per.reducer that allows you to set for instance a maximum of 10MB of data per reducer, and it clearly works for other parts of my script (and ruins the parallelism as this also affects data preparation in the beginning of my pipeline - as expected) but again DOES NOT allow more than one reducer to run with this UDF.

As far as I understand what is going on, I don't see why - if the shuffle phase can hash the user_id to one or more reducers - why this script would not be able to spawn multiple reducers, instantiate the UDF there and hash the corresponding data based on user_id to the correct reducer. There is no significant skew in my data or anything.

I am clearly missing something here but fail to see what. Does anyone have any explanation and/or suggestion?

EDIT: I updated the code as something important was missing: I was running a LIMIT between the GROUP BY and the FOREACH. And i also cleaned up irrelevant info. I also expanded the inline code to separate lines for readability.

2

2 Answers

1
votes

Your problem is that you are passing the whole data relation as input parameter to your UDF, so your UDF only gets called once with the whole data, hence it runs in only one reducer. I guess you want to call it once for each group of user_id, so try with a nested foreach instead:

data_grouped = GROUP data BY user_id;

user_features = FOREACH data_grouped {
    GENERATE group AS user_id: chararray, 
             udfs.extract_features(data) AS features: chararray;
}

This way you force the UDF to run in as many reducers as the ones used in group by.

0
votes

Having the LIMIT operator in the code between the group by and foreach eliminates the possibility to run my code in multiple reducers, even if I explicitly set the parallelism.

-- ... loading data above
data = FOREACH data_raw GENERATE user_id, ...; -- some other metrics as well
-- Group by ids
grouped_ids = GROUP data BY user_id PARALLEL 20;
-- Limit the ids to process
>>> userids = LIMIT grouped_ids (long)'$limit'; <<<
-- Generate features
user_features = FOREACH userids {
  GENERATE group as user_id:chararray,
  udfs.extract_features(data) as features:chararray;
}

Once the LIMIT is placed further in the code, I manage to get the predefined number of reducers to run my UDF:

-- ... loading data above
data = FOREACH data_raw GENERATE user_id, ...; -- some other metrics as well
-- Group by ids
grouped_ids = GROUP data BY user_id PARALLEL 20;
-- Generate features
user_features = FOREACH grouped_ids {
  GENERATE group as user_id:chararray,
  udfs.extract_features(data) as features:chararray;
}
-- Limit the features
user_features_limited = LIMIT user_features (long)'$limit';
-- ... process further and persist

So my effort of trying to optimize/reduce the inflow of user_ids was counter-productive for increasing paralellism.