1
votes

I have following app requirements:

  • Messages are received from RabbitMq, and then aggregated based on some more complex rules e.g. - based on types property (with pre-given type-time mapping) and based on existing time message has been waiting in the queue (old property)
  • All messages should be released at some variable message rate, e.g. 1msg/sec up to 100msg/sec. This rate is controlled and set by service that will monitor rabbitmq queue size (one queue that is not related to this component, and is further up the pipeline) and if too much messages are in queue - would decrease the rate.

As you can see in the image one use-case: three messages are already aggregated and are waiting to be released next second (since current rate is 1msg/sec), but just at that time, MSG arrives with id:10, and it updated AGGREGATED 2, making it become 1st message by priority. So on next tick, instead of releasing AGGREGATED 3, we release AGGREGATED 2 since it has now higher priority.

enter image description here

Now, the question is - can I use Spring Integration Aggregator for this, since I do not know if it supports prioritization of messages during aggregation? I know of groupTimeout, but that one is only adjusting single message group - not changing priority of other groups. Would it be possible to use MessageGroupStoreReaper that would adjust all other aggregated messages by priority when new MSG arrives?

UPDATE

I did some implementation like this - seems OK for now - it is aggregating messages as it arrives, and comparator is sorting messages by my custom logic.

Do you think there could be some problems with this (concurrency etc.)? I can see in the logs, that poller is invoked more than once on occations. Is this normal?

2021-01-18 13:52:05.277  INFO 16080 --- [   scheduling-1] ggregatorConfig$PriorityAggregatingQueue : POLL
2021-01-18 13:52:05.277  INFO 16080 --- [   scheduling-1] ggregatorConfig$PriorityAggregatingQueue : POLL
2021-01-18 13:52:05.277  INFO 16080 --- [   scheduling-1] ggregatorConfig$PriorityAggregatingQueue : POLL
2021-01-18 13:52:05.277  INFO 16080 --- [   scheduling-1] ggregatorConfig$PriorityAggregatingQueue : POLL

Also, is this commented doit method, proper way to increase max number of polled messages in runtime?

@Bean
    public MessageChannel aggregatingChannel(){
        return new QueueChannel(new PriorityAggregatingQueue<>((m1, m2) -> {//aggr here},
                Comparator.comparingInt(x -> x),
                (m) -> {
                    ExampleDTO d = (ExampleDTO) m.getPayload();
                    return d.getId();
                }
        ));
    }

    class PriorityAggregatingQueue<K> extends AbstractQueue<Message<?>> {
        private final Log logger = LogFactory.getLog(getClass());
        private final BiFunction<Message<?>, Message<?>, Message<?>> accumulator;
        private final Function<Message<?>, K> keyExtractor;
        private final NavigableMap<K, Message<?>> keyToAggregatedMessage;

        public PriorityAggregatingQueue(BiFunction<Message<?>, Message<?>, Message<?>> accumulator,
                                        Comparator<? super K> comparator,
                                        Function<Message<?>, K> keyExtractor) {
            this.accumulator = accumulator;
            this.keyExtractor = keyExtractor;
            keyToAggregatedMessage = new ConcurrentSkipListMap<>(comparator);
        }

        @Override
        public Iterator<Message<?>> iterator() {
            return keyToAggregatedMessage.values().iterator();
        }

        @Override
        public int size() {
            return keyToAggregatedMessage.size();
        }

        @Override
        public boolean offer(Message<?> m) {
            logger.info("OFFER");
            return keyToAggregatedMessage.compute(keyExtractor.apply(m), (k,old) -> accumulator.apply(old, m)) != null;
        }

        @Override
        public Message<?> poll() {
            logger.info("POLL");
            Map.Entry<K, Message<?>> m = keyToAggregatedMessage.pollLastEntry();
            return m != null ? m.getValue() : null;
        }

        @Override
        public Message<?> peek() {
            Map.Entry<K, Message<?>> m = keyToAggregatedMessage.lastEntry();
            return m!= null ? m.getValue() : null;
        }
    }

//    @Scheduled(fixedDelay = 10*1000)
//    public void doit(){
//        System.out.println("INCREASE POLL");
//        pollerMetadata().setMaxMessagesPerPoll(pollerMetadata().getMaxMessagesPerPoll() * 2);
//    }

    @Bean(name = PollerMetadata.DEFAULT_POLLER)
    public PollerMetadata pollerMetadata(){
        PollerMetadata metadata = new PollerMetadata();
        metadata.setTrigger(new DynamicPeriodicTrigger(Duration.ofSeconds(30)));
        metadata.setMaxMessagesPerPoll(1);
        return metadata;
    }

    @Bean
    public IntegrationFlow aggregatingFlow(
            AmqpInboundChannelAdapter aggregatorInboundChannel,
            AmqpOutboundEndpoint aggregatorOutboundChannel,
            MessageChannel wtChannel,
            MessageChannel aggregatingChannel,
            PollerMetadata pollerMetadata
    ) {
    return IntegrationFlows.from(aggregatorInboundChannel)
        .wireTap(wtChannel)
        .channel(aggregatingChannel)
        .handle(aggregatorOutboundChannel)
        .get();
    }
1

1 Answers

0
votes

Well, if there is a new message for group to complete it arrives into an aggregator, then such a group is released immediately (if your ReleaseStrategy says that though). The rest of group under timeout will continue to wait for the schedule.

It is probably possible to come up with smart algorithm to rely on a single common schedule with the MessageGroupStoreReaper to decide if we need to release that partial group or just discard it. Again: the ReleaseStrategy should give us a clue to release or not, even if partial. When discard happens and we want to keep those messages in the aggregator, we need to resend them back to the aggregator after some delay. After expiration the group is removed from the store and this happens when we have already sent into a discard channel, so it is better to delay them and let an aggregator to clean up those groups, so after delay we can safely send them back to the aggregator for a new expiration period as parts of new groups.

You probably also can iterate all of the messages in the store after releases normal group to adjust some time key in their headers for the next expiration time.

I know this is hard matter, but there is really no any out-of-the-box solution since it was not designed to affect other groups from one we just dealt with...