5
votes

I have a pig script that pertains to 2 Pig relations, lets say A and B. A is a small relationship, and B is a big one. My UDF should load all of A into memory on each machine and then use it while processing B. Currently I do it like this.

A = foreach smallRelation Generate ...
B = foreach largeRelation Generate propertyOfB;
store A into 'templocation';
C = foreach B Generate CustomUdf(propertyOfB);

I then have every machine load from 'templocation' to get A.This works, but I have two problems with it.

  1. My understanding is I should be using the HDFS cache somehow, but I'm not sure how to load a relationship directly into the HDFS cache.
  2. When I reload the file in my UDF I got to write logic to parse the output from A that was outputted to file when I'd rather be directly using bags and tuples (is there a built in Pig java function to parse Strings back into Bag/Tuple form?).

Does anyone know how it should be done?

1
Do A and B have columns on which you could do a JOIN? - alexeipab
In this case, yes, they have the same data and could be joined. I need to compare every row of A with every row of B though. I guess I could do a cross join, but wouldn't that be even less efficient? I'd be re-processing B A - 1 times more than necessary and I'd lose the ability to run all rows of B against a single row of A at once which is required. - Manny
Can you post an example of input and output data? - alexeipab
A: {id: chararray,attributes: {tuple_of_tokens: (token: chararray)}} and C is a bag of two field tuples where the first field is the id of A and the second one is a tuple containing top N B for that A. I'm hoping theres a generic solution to this though.. this isnt the only place I need to store prior pig results in a UDF. - Manny
If you can do join on id column, than you could user replicated join, which is a map side join. J = JOIN B by (id), A by (id) as 'replicated'; Apache Pig will load A in cache on each data node. than pass J to the UDF, I think this will take place during the Map stage, thus will be efficient. - alexeipab

1 Answers

1
votes

Here's a trick that will work for you.

You do a GROUP ALL on A first which "bags" all data in A into one field. Then artificially add a common field on both A and B and join them. This way, foreach tuple in the enhanced B, you will have the full data of A for your UDF to use.

It's like this:

(say originally in A, you have fields fa1, fa2, fa3, in B you have fb1, fb2)

-- add an artificial join key with value 'xx'
B_aux = FOREACH B GENERATE 'xx' AS join_key, fb1, fb2;
A_all = GROUP A ALL;
A_aux = FOREACH A GENERATE 'xx' AS join_key, $1;
A_B_JOINED = JOIN B_aux BY join_key, A_aux BY join_key USING 'replicated';

C = FOREACH A_B_JOINED GENERATE CustomUdf(fb1, fb2, A_all);

since this is replicated join, it's also only map-side join.