Using Pig on a Hadoop cluster, I have a huge bag
of huge tuples
which I regularly add fields to as I continue to work on this project, and several UDFs which use various fields from it. I want to be able to call a UDF on just a few fields from each tuple
and reconnect the result to that particular tuple. Doing a join to reconnect the records using unique ids takes forever on billions of records.
I think there should be a way to do this all inside the GENERATE
statement, but I can't find the right syntax.
Here is some toy code using a Python UDF to get the idea across.
Register 'jumper.py' using jython as myfuncs;
jumps = LOAD 'jumps.csv' USING PigStorage(',')
AS (jumper:int, attempt:int, distance:double, location:chararray);
byJumper = GROUP jumps by jumper;
sigmas = FOREACH byJumper GENERATE
jumps.jumper, jumps.attempt, jumps.distance, jumps.location,
myfuncs.conv2sigma(jumps.distance);
rmf sigmas
STORE sigmas INTO 'sigmas' USING PigStorage(',');
This is producing bags of tuples with single fields in each tuple, rather than tuples of the form I expect.
The input data is
- people (with unique integer IDs),
- their long jump attempts (with unique-to-that-person integer IDs),
- the distance they jumped,
- the location they were jumping at the time.
For each jump we want to generate how many standard deviations (sigmas) the jumper was from their average, then later we'll correlate sigmas by location to see where jumpers do the best. We need to calculate the average and standard deviation for each person then a 'sigma' for each jump, and store the data with the new sigma field attached.
The question is:
How do we change this to output tuples like
(jumper:int, attempt:int, distance:double, location:chararray, sigma:double)
?
I have tried FLATTEN
in various ways and it only gets me enormous cross-products. I can change my UDF to take in jumper and attempt and output a triple then do a JOIN
, but in the real world this solution is enormously impractical because of the size of the data sets.
Here's the supporting code and data if you want to try it at home:
jumper.py: (a quick, not thoughtful, implementation -- the only important thing here is that it takes a bag input and produces a bag output with one output tuple corresponding to each input tuple)
#!/usr/local/bin/python
# we're forced to use python 2.5.2 :-(
from math import sqrt
@outputSchema("y:bag{t:tuple(sigma:double)}")
def conv2sigma(bag):
s = 0.0
n = 0
dd = []
print('conv2sigma input bag:')
print(bag)
for word in bag:
d = float(word[0])
dd.append(d)
n += 1
s += d
a = s / n
ss = 0
for d in dd:
ss += (d-a)**2
sd = sqrt(ss)
outputBag = []
for d in dd:
outputBag.append( ( (d-a)/sd, ) )
print('conv2sigma output bag:')
print(outputBag)
return outputBag
The input file jumps.csv
:
0,0,5,a
0,1,6,b
0,2,7,c
0,3,5,a
0,4,8,c
0,5,7,b
0,6,6,b
0,7,7,c
0,8,5,a
1,0,6,a
1,1,5,a
1,2,7,b
1,3,4,a
1,4,5,a
1,5,7,b
1,6,8,c
1,7,9,c
1,8,5,a
1,9,4,a
1,10,5,a
1,11,6,b
1,12,8,c
1,13,8,b
2,0,7,b
2,1,5,a
2,2,6,b
2,3,5,a
2,4,7,c
2,5,5,a
2,6,6,c
2,7,5,a
2,8,7,b
2,9,5,a
2,10,6,b
The output produced as written now:
{(0),(0),(0),(0),(0),(0),(0),(0),(0)},{(1),(2),(3),(4),(5),(6),(7),(8),(0)},{(6.0),(7.0),(5.0),(8.0),(7.0),(6.0),(7.0),(5.0),(5.0)},{(b),(c),(a),(c),(b),(b),(c),(a),(a)},{(-0.07188851546895898),(0.25160980414135625),(-0.39538683507927425),(0.5751081237516715),(0.25160980414135625),(-0.07188851546895898),(0.25160980414135625),(-0.39538683507927425),(-0.39538683507927425)}
{(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1)},{(8),(0),(1),(2),(3),(4),(5),(6),(7),(9),(10),(11),(12),(13)},{(5.0),(6.0),(5.0),(7.0),(4.0),(5.0),(7.0),(8.0),(9.0),(4.0),(5.0),(6.0),(8.0),(8.0)},{(a),(a),(a),(b),(a),(a),(b),(c),(c),(a),(a),(b),(c),(b)},{(-0.20716308289978433),(-0.03655819109996196),(-0.20716308289978433),(0.1340467006998604),(-0.3777679746996067),(-0.20716308289978433),(0.1340467006998604),(0.30465159249968277),(0.4752564842995052),(-0.3777679746996067),(-0.20716308289978433),(-0.03655819109996196),(0.30465159249968277),(0.30465159249968277)}
{(2),(2),(2),(2),(2),(2),(2),(2),(2),(2),(2)},{(0),(1),(2),(3),(4),(5),(6),(7),(8),(9),(10)},{(7.0),(5.0),(6.0),(5.0),(7.0),(5.0),(6.0),(5.0),(7.0),(5.0),(6.0)},{(b),(a),(b),(a),(c),(a),(c),(a),(b),(a),(b)},{(0.4276686017238498),(-0.2960782627318961),(0.06579516949597684),(-0.2960782627318961),(0.4276686017238498),(-0.2960782627318961),(0.06579516949597684),(-0.2960782627318961),(0.4276686017238498),(-0.2960782627318961),(0.06579516949597684)}
Each output tuple is a collection of bags, and each bag contains tuples with single entries from one field, which is not what we want.