I am using datastax java driver 3.1.0 to connect to cassandra cluster and my cassandra cluster version is 2.0.10. I am writing asynchronously with QUORUM consistency.
private final ExecutorService executorService = Executors.newFixedThreadPool(10);
public void save(String process, int clientid, long deviceid) {
String sql = "insert into storage (process, clientid, deviceid) values (?, ?, ?)";
try {
BoundStatement bs = CacheStatement.getInstance().getStatement(sql);
bs.setConsistencyLevel(ConsistencyLevel.QUORUM);
bs.setString(0, process);
bs.setInt(1, clientid);
bs.setLong(2, deviceid);
ResultSetFuture future = session.executeAsync(bs);
Futures.addCallback(future, new FutureCallback<ResultSet>() {
@Override
public void onSuccess(ResultSet result) {
logger.logInfo("successfully written");
}
@Override
public void onFailure(Throwable t) {
logger.logError("error= ", t);
}
}, executorService);
} catch (Exception ex) {
logger.logError("error= ", ex);
}
}
My above save method will be called from multiple threads at very fast speed.
Question:
I want to throttle the request to executeAsync
method which writes asynchronously into Cassandra. If I write at very high speed than my Cassandra cluster can handle then it will start throwing errors and I want all my writes should go successfully into cassandra without any loss.
I saw this post where solution is to use Semaphore
with fixed number of permits. But I am not sure how and what is the best way to implement that. I have never used Semaphor before. This is the logic. Can anyone provide an example with Semaphore basis on my code or if there is any better way/option, then let me know as well.
In the context of writing a dataloader program, you could do something like the following:
- To keep things simple use a Semaphore or some other construct with a fixed number of permits (that will be your maximum number of inflight requests). Whenever you go to submit a query using executeAsync, acquire a permit. You should really only need 1 thread (but may want to introduce a pool of # cpu cores size that does this) that acquires the permits from the Semaphore and executes queries. It will just block on acquire until there is an available permit.
- Use Futures.addCallback for the future returned from executeAsync. The callback should call Sempahore.release() in both onSuccess and onFailure cases. By releasing a permit, this should allow your thread in step 1 to continue and submit the next request.
Also I have seen couple of other post where they have talked about using RingBuffer
or Guava RateLimitter
so which one is better and I should be using? Below are the options I can think of:
- Using Semaphore
- Using Ring Buffer
- Using Guava Rate Limiter
Can anyone help me with an example of how we can throttle the request or get backpressure for cassandra writes and making sure all writes goes successfully into cassandra?