0
votes

In Hadoop I have a collection of datapoints, each including a "startTime" and "endTime" in milliseconds. I want to group on one field then identify each place in the bag where one datapoint overlaps another in the sense of start/end time. For example, here's some data:

0,A,0,1000
1,A,1500,2000
2,A,1900,3000
3,B,500,2000
4,B,3000,4000
5,B,3500,5000
6,B,7000,8000

which I load and group as follows:

inputdata = LOAD 'inputdata' USING PigStorage(',')
    AS (id:long, where:chararray, start:long, end:long);

grouped = GROUP inputdata BY where;

The ideal result here would be

(1,2)
(4,5)

I have written some bad code to generate an individual tuple for each second with some rounding, then do a set intersection, but this seems hideously inefficient, and in fact it still doesn't quite work. Rather than debug a bad approach, I want to work on a good approach.

How can I reasonably efficiently get tuples like (id1,id2) for the overlapping datapoints?

I am thoroughly comfortable writing a Java UDF to do the work for me, but it seems as though Pig should be able to do this without needing to resort to a custom UDF.

2

2 Answers

0
votes

This is not an efficient solution, and I recommend writing a UDF to do this.

Self Join the dataset with itself to get a cross product of all the combinations. In pig, it's difficult to join something with itself, so you just act as if you are loading two separate datasets. After the cross product, you end up with data like

1,A,1500,2000,1,A,1500,2000
1,A,1500,2000,2,A,1900,3000
.....

At this point, you need to satisfy four conditionals,

  • "where" field matches
  • id one and two from the self join don't match (so you don't get back the same ID intersecting with itself)
  • start time from second group being compared should be greater than start time for first group and less then end time for first group

This code should work, might have a syntax error somewhere as I couldn't test it but should help you to write what you need.

inputdataone = LOAD 'inputdata' USING PigStorage(',')
    AS (id:long, where:chararray, start:long, end:long);

inputdatatwo = LOAD 'inputdata' USING PigStorage(',')
    AS (id:long, where:chararray, start:long, end:long);

crossProduct = CROSS inputdataone, inputdatatwo;

crossProduct =
    FOREACH crossProduct
    GENERATE inputdataone::id as id_one,
        inputdatatwo::id as id_two,
        (inputdatatwo::start-inputdataone::start>=0 AND inputdatatwo::start-inputdataone::end<=0 AND inputdataone::where==inputdatatwo::where?1:0) as intersect;

find_intersect = FILTER crossProduct BY intersect==1;

final =
    FOREACH find_intersect
    GENERATE id_one,
        id_two;
0
votes

Crossing large sets inflates the data.

A naive solution without crossing would be to partition the intervals and check for intersections within each interval.

I am working on a similar problem and will provide a code sample when I am done.