6
votes

I have a service that consumes messages off of a queue at a rate that I control. I do some processing and then attempt to write to a Cassandra cluster via the Datastax Java client. I have setup my Cassandra cluster with maxRequestsPerConnection and maxConnectionsPerHost. However, in testing I have found that when I have reached maxConnectionsPerHost and maxRequestsPerConnection calls to session.executeAsync don't block.

What I am doing right now is using a new Semaphore(maxConnectionsPerHost * maxRequestsPerConnection) and incrementing it before every async request and decrementing it when the future returned by executeAsync completes. This works well enough, but it seems redundant since the driver is already tracking requests and connections internally.

Has anyone come up with a better solution to this problem?

One caveat: I would like a request to be considered outstanding until it has completed. This includes retries! The situation where I am getting retryable failures from the cluster (such as timeouts waiting for consistency) is primary situation where I want to backpressure and stop consuming messages from the queue.

Problem:

// the rate at which I consume messages depends on how fast this method returns
processMessage(message) {
    // this appears to return immediately even if I have exhausted connections/requests
    session.executeAsync(preparedStatement.bind(...));
}

Current solution:

constructor() {
    this.concurrentRequestsSemaphore = new Semaphore(maxConnectionsPerHost * maxRequestsPerConnection);
}

processMessage(message) {
    ResultSetFuture resultSetFuture = session.executeAsync(preparedStatement.bind(...));
    CompletableFuture<ResultSet> future = completableFromListenable(resultSetFuture);
    concurrentRequestsSemaphore.acquireUninterruptibly();
    future.whenComplete((result, exception) -> concurrentRequests.release());
}

Also, can anyone see any obvious problems with this solution?

2
I think your use case as described would greatly benefit from RxJava; it makes retries trivial and backpressure doable.Tassos Bassoukos

2 Answers

5
votes

One possible idea not to kill the cluster is to "throttle" your calls to executeAsync e.g. after a batch of 100 (or whatever number is the best for your cluster and workload), you'll do a sleep in the client code and do a blocking call on all the 100 futures (or use Guava library to transform a list of future into a future of list)

This way, after issuing 100 async queries, you'll force the client application to wait for all of them to succeed before proceeding further. If you catch any exception when calling future.get(), you can schedule a retry. Normally the retry is already attempted by the default RetryStrategy of the Java driver.

About back-pressure signal from server, starting from CQL binary protocol V3, there is an error code that notifies the client that the coordinator is overloaded : https://github.com/apache/cassandra/blob/trunk/doc/native_protocol_v3.spec#L951

From the client, you can get this overloaded information in 2 ways:

2
votes

What I am doing right now is using a new Semaphore(maxConnectionsPerHost * maxRequestsPerConnection) and incrementing it before every async request and decrementing it when the future returned by executeAsync completes. This works well enough, but it seems redundant since the driver is already tracking requests and connections internally.

That is a pretty reasonable approach, that allows new requests to fill in while other ones complete. You can tie releasing a permit to the future completion.

The reason why the driver doesn't do this itself is that it tries to do as little blocking as possible and instead fails fast. Unfortunately this pushes some responsibility to the client.

In the usual case it is not good to send that many requests simultaneously to a host at a time. C* has a native_transport_max_threads setting (default 128) that controls the number of threads handling requests at a time. It would be better to throttle yourself at that 2 * that number per host. (See: How Cassandra handle blocking execute statement in datastax java driver for more detail there)

I would like a request to be considered outstanding until it has completed. This includes retries! The situation where I am getting retryable failures from the cluster (such as timeouts waiting for consistency) is primary situation where I want to backpressure and stop consuming messages from the queue.

The driver will not complete the future until it has completed successfully, exhausted its retries or failed for some reason. Therefore you can tie releasing of the the semaphore permits until the future completes or fails.