0
votes

Hi we are trying to streaming process finance market data to calculate trading signal by leveraging apache camel or spring integration. One of our use case is to aggregate consecutive prices together based on price timestamps as following:

  • input

the input message comes as (timestamp,price) pairs in time series. Suppose the values coming in as, each pair (TX,PX) is a message while T for time stamp and P for price value

(T0,P1),(T1,P1),(T2,P2),(T3,P3),(T4,P4)... 
  • aggregation

suppose we need to aggregate every 3 consecutive messages together for further calculation, given the input message we need to produce the following groups, each 3 pairs group is an aggregated message:

[(T0,P1),(T1,P1),(T2,P2)],
[(T1,P1),(T2,P2),(T3,P3)],
[(T2,P2),(T3,P3),(T4,P4)],
....

As you can see most of the messages will be aggregate to more than one group. Can someone suggests if there is a way to do this by using current aggregator without writing one.

It seems that spring integration aggregate grouping is based on correlation key as well, so the messages will need to map to a group of correlation keys. However, the current api seems only allow us to produce one correlation key, which means each message can only be aggregated to one group. Is there any work around for this.

P.S.

after reading the source code of camel, seems camel can not support our requirement. Just try my luck with spring. Finger crossed camel question

1
Sorry - I mis-read your question.Gary Russell
I fixed my answer with a possible solution.Gary Russell

1 Answers

1
votes

We don't have anything out of the box, but I was able to do what you want with a small modification to the SimpleMessageStore. I have posted the full RollingMessageStore in a gist.

The bottom line is to modify removeGroup to only remove the first message, and not the whole group. Also, make completeGroup a no-op.

Set expreGroupOnCompletion to force the aggregator to "remove" the group (by calling the modified removeGroup() method.

Here is a diff between SimpleMessageGroup and RollingMessageGroup...

182,184c190,194
< 
<               groupUpperBound.release(groupIdToMessageGroup.get(groupId).size());
<               groupIdToMessageGroup.remove(groupId);
---
>               Message<?> message = this.groupIdToMessageGroup.get(groupId).getOne();
>               if (message != null) {
>                   this.groupUpperBound.release(1);
>                   this.removeMessageFromGroup(groupId, message);
>               }

(plus remove all the code in completeGroup().

and a test case...

@Test
public void testRolling() {
    AggregatingMessageHandler aggregator = new AggregatingMessageHandler(new MultiplyingProcessor(), new RollingMessageStore());
    aggregator.setExpireGroupsUponCompletion(true);
    aggregator.setReleaseStrategy(new ReleaseStrategy() {

        @Override
        public boolean canRelease(MessageGroup group) {
            return group.size() == 3;
        }
    });
    QueueChannel replyChannel = new QueueChannel();
    Message<?> message1 = createMessage(3, "ABC", 3, 1, replyChannel, null);
    Message<?> message2 = createMessage(5, "ABC", 3, 2, replyChannel, null);
    Message<?> message3 = createMessage(7, "ABC", 3, 3, replyChannel, null);
    Message<?> message4 = createMessage(9, "ABC", 3, 3, replyChannel, null);
    Message<?> message5 = createMessage(11, "ABC", 3, 3, replyChannel, null);

    aggregator.handleMessage(message1);
    aggregator.handleMessage(message2);
    aggregator.handleMessage(message3);
    aggregator.handleMessage(message4);
    aggregator.handleMessage(message5);

    Message<?> reply = replyChannel.receive(10000);
    assertNotNull(reply);
    assertEquals(reply.getPayload(), 105);
    reply = replyChannel.receive(10000);
    assertNotNull(reply);
    assertEquals(reply.getPayload(), 315);
    reply = replyChannel.receive(10000);
    assertNotNull(reply);
    assertEquals(reply.getPayload(), 693);
}

Please go ahead and open a JIRA New Feature Issue and we'll look at adding this (or a more general solution) to the upcoming 3.0 release.

Use correlation-strategy-expression="'foo'" and

release-strategy-expression=size()==3.