I have following app requirements:
- Messages are received from RabbitMq, and then aggregated based on some more complex rules e.g. - based on
types
property (with pre-given type-time mapping) and based on existing time message has been waiting in the queue (old
property) - All messages should be released at some variable message rate, e.g. 1msg/sec up to 100msg/sec. This rate is controlled and set by service that will monitor rabbitmq queue size (one queue that is not related to this component, and is further up the pipeline) and if too much messages are in queue - would decrease the rate.
As you can see in the image one use-case: three messages are already aggregated and are waiting to be released next second (since current rate is 1msg/sec
), but just at that time, MSG
arrives with id:10
, and it updated AGGREGATED 2
, making it become 1st message by priority. So on next tick, instead of releasing AGGREGATED 3
, we release AGGREGATED 2
since it has now higher priority.
Now, the question is - can I use Spring Integration Aggregator for this, since I do not know if it supports prioritization of messages during aggregation? I know of groupTimeout
, but that one is only adjusting single message group - not changing priority of other groups. Would it be possible to use MessageGroupStoreReaper
that would adjust all other aggregated messages by priority when new MSG arrives?
UPDATE
I did some implementation like this - seems OK for now - it is aggregating messages as it arrives, and comparator is sorting messages by my custom logic.
Do you think there could be some problems with this (concurrency etc.)? I can see in the logs, that poller is invoked more than once on occations. Is this normal?
2021-01-18 13:52:05.277 INFO 16080 --- [ scheduling-1] ggregatorConfig$PriorityAggregatingQueue : POLL
2021-01-18 13:52:05.277 INFO 16080 --- [ scheduling-1] ggregatorConfig$PriorityAggregatingQueue : POLL
2021-01-18 13:52:05.277 INFO 16080 --- [ scheduling-1] ggregatorConfig$PriorityAggregatingQueue : POLL
2021-01-18 13:52:05.277 INFO 16080 --- [ scheduling-1] ggregatorConfig$PriorityAggregatingQueue : POLL
Also, is this commented doit
method, proper way to increase max number of polled messages in runtime?
@Bean
public MessageChannel aggregatingChannel(){
return new QueueChannel(new PriorityAggregatingQueue<>((m1, m2) -> {//aggr here},
Comparator.comparingInt(x -> x),
(m) -> {
ExampleDTO d = (ExampleDTO) m.getPayload();
return d.getId();
}
));
}
class PriorityAggregatingQueue<K> extends AbstractQueue<Message<?>> {
private final Log logger = LogFactory.getLog(getClass());
private final BiFunction<Message<?>, Message<?>, Message<?>> accumulator;
private final Function<Message<?>, K> keyExtractor;
private final NavigableMap<K, Message<?>> keyToAggregatedMessage;
public PriorityAggregatingQueue(BiFunction<Message<?>, Message<?>, Message<?>> accumulator,
Comparator<? super K> comparator,
Function<Message<?>, K> keyExtractor) {
this.accumulator = accumulator;
this.keyExtractor = keyExtractor;
keyToAggregatedMessage = new ConcurrentSkipListMap<>(comparator);
}
@Override
public Iterator<Message<?>> iterator() {
return keyToAggregatedMessage.values().iterator();
}
@Override
public int size() {
return keyToAggregatedMessage.size();
}
@Override
public boolean offer(Message<?> m) {
logger.info("OFFER");
return keyToAggregatedMessage.compute(keyExtractor.apply(m), (k,old) -> accumulator.apply(old, m)) != null;
}
@Override
public Message<?> poll() {
logger.info("POLL");
Map.Entry<K, Message<?>> m = keyToAggregatedMessage.pollLastEntry();
return m != null ? m.getValue() : null;
}
@Override
public Message<?> peek() {
Map.Entry<K, Message<?>> m = keyToAggregatedMessage.lastEntry();
return m!= null ? m.getValue() : null;
}
}
// @Scheduled(fixedDelay = 10*1000)
// public void doit(){
// System.out.println("INCREASE POLL");
// pollerMetadata().setMaxMessagesPerPoll(pollerMetadata().getMaxMessagesPerPoll() * 2);
// }
@Bean(name = PollerMetadata.DEFAULT_POLLER)
public PollerMetadata pollerMetadata(){
PollerMetadata metadata = new PollerMetadata();
metadata.setTrigger(new DynamicPeriodicTrigger(Duration.ofSeconds(30)));
metadata.setMaxMessagesPerPoll(1);
return metadata;
}
@Bean
public IntegrationFlow aggregatingFlow(
AmqpInboundChannelAdapter aggregatorInboundChannel,
AmqpOutboundEndpoint aggregatorOutboundChannel,
MessageChannel wtChannel,
MessageChannel aggregatingChannel,
PollerMetadata pollerMetadata
) {
return IntegrationFlows.from(aggregatorInboundChannel)
.wireTap(wtChannel)
.channel(aggregatingChannel)
.handle(aggregatorOutboundChannel)
.get();
}