2
votes

I am using redis as a queue (using the spring queue-in/outbound-channel-adapter) to distribute tasks (a message into the queue, etc)

As the throughput is quite high we observed that, although the messages were sent to the redis queue, a lot of them were lost and no messages arrived to the component after the inbound (a header router)

The channel config is attached below; the point is that we though that the problem was in this header router after the inbound addapter, that was unable to manage the rate of messages read from the queue, so they were lost.

We have use an intermediate element between the inbound adapter and this component (that is a header-router) and add a queue to fix this.

This works fine, but actually we don't fully understand the solution and if this is the proper one.

An expert view and opinion about this configuration will be wellcome!

Thanks


<!-- a Queue Inbound Channel Adapter is available to 'right pop' messages
     from a Redis List. -->
    <redis:queue-inbound-channel-adapter
     id="fromRedis" channel="in" queue="${name}"
        receive-timeout="1000" recovery-interval="3000" expect-message="true"
            auto-startup="true"/>

    <!-- a queue to avoid lost messages before the header router -->
    <int:channel id="in">
        <int:queue capacity="1000"/>
    </int:channel>

    <!-- a bridge to connect channels and have a poller -->
    <int:bridge input-channel="in" output-channel="out">
        <int:poller fixed-delay="500" />
    </int:bridge>

    <int:header-value-router id="router" timeout="15000"
        input-channel="out" header-name="decision"
        resolution-required="false" default-output-channel="defaultChannel" />

---added on 26/02

To insert messages into redis we have a web service, but actually is as you said, simply write messages into redis (

for... channel.send(msg)

Nothing more

About your answer I am now thinking in remove the in channel and its queue and use directly the header-value-router; but I have more questions:

  1. I think the right solution is a low value for timeout in header-value-router, so I'll have the error notification faster if we don't have a consumer available. If I don't use a value as timeout, it will block indefinitely and this is a bad idea, isn't it?

  2. I don't know how to manage the MesssageDeliveryException because the router don't have an error-channel configuration, ???

  3. I think that if I can manage this error and get the message back I can re-send it to redis again. There are other servers that get the messages from redis and they luckily could attend it.

I add my proposed solution, but is not complete and we are not sure about the error management as I explained above

<!-- a Queue Inbound Channel Adapter is available to 'right pop' messages
 from a Redis List. -->
<redis:queue-inbound-channel-adapter
 id="fromRedis" channel="in" queue="${name}"
    receive-timeout="1000" recovery-interval="3000" expect-message="true"
        auto-startup="true"/>

 <!-- a header-value-router with quite low timeout -->
   <int:header-value-router id="router" timeout="150"
    input-channel="in" header-name="decision"
    resolution-required="false" default-output-channel="defaultChannel" />

 <!-- ¿if MessageDeliveryException???? what to do??? -->

<int:channel id="someConsumerHeaderValue">
    <int:dispatcher task-executor="ConsumerExecutor" />
</int:channel>
<!-- If 5 threads are busy we queue messages up to 5; if queue is full we can increase to 5 more working threads; if no more threads we have a... ¿¿MessageDeliveryException?? -->

<task:executor id="ConsumerExecutor" pool-size="5-5"
               queue-capacity="5" />
1

1 Answers

1
votes

Well, that's great to see such an observation. That might improve the Framework somehow.

So, I'd like to see:

  1. Some test-case to reproduce from the Framework perspective. Although I guess there is just enough to send a lot of messages to the Redis and use your config to consume. (Correct me if there is need anything else)

  2. The downstream flow after the <int:header-value-router>. Look, you use there timeout="15000" which is synonym to the send-timeout :

Specify the maximum amount of time in milliseconds to wait when sending Messages to the target MessageChannels if blocking is possible (e.g. a bounded queue channel that is currently full). By default the send will block indefinitely. Synonym for 'timeout' - only one can be supplied.

From here I can say that if your downstream consumer if enough slow on some QueueChannel there you end up with the:

/**
 * Inserts the specified element at the tail of this queue, waiting if
 * necessary up to the specified wait time for space to become available.
 *
 * @return {@code true} if successful, or {@code false} if
 *         the specified waiting time elapses before space is available
 * @throws InterruptedException {@inheritDoc}
 * @throws NullPointerException {@inheritDoc}
 */
public boolean offer(E e, long timeout, TimeUnit unit)
....
while (count.get() == capacity) {
       if (nanos <= 0)
             return false;
       nanos = notFull.awaitNanos(nanos);
}

Pay attention to that return false; indicating exactly the message lost.

That is also know like back-pressure drop strategy.

Let me know if you have different picture there.

You may consider to remove that timeout="15000" to meet the same in queue channel behavior.

UPDATE

Well, the error handling works a bit different way. The "guilty" component just throws Exception, like it is with raw Java and it is OK that this component isn't responsible for Exception catching that is up to the caller. And caller in our case an upstream component - <redis:queue-inbound-channel-adapter>.

Any inbound channel adapter has an error-channel option. Through the <poller> if it is MessageSource or directly when it is MessageProducer.

I'm sure you will be able to handle:

if (!sent) {
    throw new MessageDeliveryException(message,
            "failed to send message to channel '" + channel + "' within timeout: " + timeout);
}

in that error-channel sub-flow and achieve your requirements for recovery.