1
votes

I have rows in BigData DB (Cassandra in my case) with column names col1,col2,col3,val1,val2

in SQL approach I can do group by col1,col2 or col2,col1 or any other possible way also. This way I can form tree hierarchy easily.

But now we are using Cassandra to store the data which doesnt support group by. So we want to use Storm for doing group by and aggregations. We wrote some sample code do aggregation and group by, but we are unable to form an opinion whether we can achieve it or not.

Data looks like this

col1,col2,col3,val1,val2
------------------------
a1,b1,c1,10,20
a1,b1,c2,11,13
a1,b2,c1,9,15
a1,b2,c3,13,88
a2,b1,c1,30,44
a2,b3,c2,22,33
a4,b4,c4,99,66

Like in excel pivot I want to build hierarchy root->child1->child2->child3-val1,val2 then it may look like this if my hierarchy is col1->col2->col3

a1          {43,136}
    --b1        {21,33}
        --c1    10,20
        --c2    11,13
    --b2        {22,103}
        --c1    9,15
        --c3    13,88
a2          {52,77}
    --b1        {30,44}
        --c1    30,44
    --b3        {22,33}
    --c2    22,33
a4          {99,66}
    --b4        {99,66}
        --c4    99,66

I want to give user functionality to re-arrange hierarchy elements something like col3->col1->col2 (or something else also, which is dynamic) in this case data will look like this

c1          {49,79}
    --a1        {19,35}
        --b1    10,20
        --b2    9,15
    --a2        {30,44}
        --b1    30,44
c2          {11,13}
    --a1        {11,13}
        --b1    11,13
    --a2        {22,33}
        --b3    22,33
c3          {13,88}
    --a1        {13,88}
        --b2    13,88
c4          {99,66}
    --a4        {99,66}
        --b4    99,66

Few lines of my trident code looks like this, which is not working as expected.

topology.newStream("aggregation", spout)
.groupBy(new Fields("col1","col2","col3","val1","val2"))
.aggregate(new Fields("val1","val2"), new Sum(), new Fields("val1sum","val2sum"))
.each(new Fields("col1","col2","col3","val1sum","val2sum"), new Utils.PrintFilter());

For doing above transformations I want to use Storm with or without Trident API support. Can anyone guide me how to achieve it? Any program ideas are very much appreciated.

1

1 Answers

0
votes

You should include only dimensions (your col1, col2 and col3) and NOT measures (your val1, val2) in the groupBy. And when you need to aggregate for multiple measures, you need to use chainedAgg() construct. The following would be the changed topology code for your use case:

            topology.newStream("aggregation", spout)
    .groupBy(new Fields("col1","col2"))
    .chainedAgg()
    .aggregate(new Fields("val1"), new Sum(), new Fields("val1sum"))
    .aggregate(new Fields("val2"), new Sum(), new Fields("val2sum"))
    .chainEnd()
    .each(new Fields("col1","col2","val1sum", "val2sum"), new Utils.PrintFilter());

It produces the following output, just as you expected!

PartitionId=0, [a1, b1, 21, 33]

PartitionId=0, [a1, b2, 22, 103]

PartitionId=0, [a4, b4, 99, 66]

PartitionId=0, [a2, b1, 30, 44]

PartitionId=0, [a2, b3, 22, 33]

Cheers!

MK