I have a service that consumes messages off of a queue at a rate that I control. I do some processing and then attempt to write to a Cassandra cluster via the Datastax Java client. I have setup my Cassandra cluster with maxRequestsPerConnection
and maxConnectionsPerHost
. However, in testing I have found that when I have reached maxConnectionsPerHost
and maxRequestsPerConnection
calls to session.executeAsync
don't block.
What I am doing right now is using a new Semaphore(maxConnectionsPerHost * maxRequestsPerConnection)
and incrementing it before every async request and decrementing it when the future returned by executeAsync
completes. This works well enough, but it seems redundant since the driver is already tracking requests and connections internally.
Has anyone come up with a better solution to this problem?
One caveat: I would like a request to be considered outstanding until it has completed. This includes retries! The situation where I am getting retryable failures from the cluster (such as timeouts waiting for consistency) is primary situation where I want to backpressure and stop consuming messages from the queue.
Problem:
// the rate at which I consume messages depends on how fast this method returns
processMessage(message) {
// this appears to return immediately even if I have exhausted connections/requests
session.executeAsync(preparedStatement.bind(...));
}
Current solution:
constructor() {
this.concurrentRequestsSemaphore = new Semaphore(maxConnectionsPerHost * maxRequestsPerConnection);
}
processMessage(message) {
ResultSetFuture resultSetFuture = session.executeAsync(preparedStatement.bind(...));
CompletableFuture<ResultSet> future = completableFromListenable(resultSetFuture);
concurrentRequestsSemaphore.acquireUninterruptibly();
future.whenComplete((result, exception) -> concurrentRequests.release());
}
Also, can anyone see any obvious problems with this solution?