I have a Pig script with a Python UDF that is supposed to generate user level features. My data is preprocessed by Pig and then sent to an UDF as a list of tuples. The UDF will process the tuples of data and return a chararray with my features computer per user. The code where this happens looks like this below:
-- ... loading data above
data = FOREACH data_raw GENERATE user_id, ...; -- some other metrics as well
-- Group by ids
grouped_ids = GROUP data BY user_id PARALLEL 20;
-- Limit the ids to process
userids = LIMIT grouped_ids (long)'$limit';
-- Generate features
user_features = FOREACH userids {
GENERATE group as user_id:chararray,
udfs.extract_features(data) as features:chararray;
}
The UDF code clearly runs in the reducer, and for some reason it always goes to one reducer and it takes quite some time. I am searching for a way to parallelize the execution of it, as now my job takes 22 minutes in total of which 18 mins are in this single reducer.
Pig tries to allocate 1GB of data to a reducer typically, and my data is indeed less than 1GB, around 300-700MB, but pretty time consuming on the UDF end, so this is clearly not optimal, while the rest of my cluster is empty.
Things I have tried:
- Setting
default parallel
impacts the whole script script, but still does not manage to get the reducer with the UDF to parallelize - Manually setting
parallel
onGROUP data BY user_id
parallelizes the output of the group and invokes multiple reducers, but at the point where the UDF kicks in, it's again a single reducer - Setting
pig.exec.reducers.bytes.per.reducer
that allows you to set for instance a maximum of 10MB of data per reducer, and it clearly works for other parts of my script (and ruins the parallelism as this also affects data preparation in the beginning of my pipeline - as expected) but again DOES NOT allow more than one reducer to run with this UDF.
As far as I understand what is going on, I don't see why - if the shuffle phase can hash the user_id to one or more reducers - why this script would not be able to spawn multiple reducers, instantiate the UDF there and hash the corresponding data based on user_id to the correct reducer. There is no significant skew in my data or anything.
I am clearly missing something here but fail to see what. Does anyone have any explanation and/or suggestion?
EDIT: I updated the code as something important was missing: I was running a LIMIT
between the GROUP BY
and the FOREACH
. And i also cleaned up irrelevant info. I also expanded the inline code to separate lines for readability.