I am using datastax java driver 3.1.0 to connect to cassandra cluster and my cassandra cluster version is 2.0.10. I am writing asynchronously with QUORUM
consistency.
private final ExecutorService executorService = Executors.newFixedThreadPool(10);
private final Semaphore concurrentQueries = new Semaphore(1000);
public void save(String process, int clientid, long deviceid) {
String sql = "insert into storage (process, clientid, deviceid) values (?, ?, ?)";
try {
BoundStatement bs = CacheStatement.getInstance().getStatement(sql);
bs.setConsistencyLevel(ConsistencyLevel.QUORUM);
bs.setString(0, process);
bs.setInt(1, clientid);
bs.setLong(2, deviceid);
concurrentQueries.acquire();
ResultSetFuture future = session.executeAsync(bs);
Futures.addCallback(future, new FutureCallback<ResultSet>() {
@Override
public void onSuccess(ResultSet result) {
concurrentQueries.release();
logger.logInfo("successfully written");
}
@Override
public void onFailure(Throwable t) {
concurrentQueries.release();
logger.logError("error= ", t);
}
}, executorService);
} catch (Exception ex) {
logger.logError("error= ", ex);
}
}
My above save method will be called from multiple threads at very fast speed. If I write at very high speed than my Cassandra cluster can handle then it will start throwing errors and I want all my writes should go successfully into cassandra without any loss.
Question:
I was thinking to use some sort off queue or buffer to enqueue requests (e.g. java.util.concurrent.ArrayBlockingQueue
). "Buffer full" would mean that clients should wait. Buffer would also be used to re-enqueue failed requests. However to be more fair failed requests probably should be put to front of queue so they are retried first. Also we should somehow handle situation when queue is full and there are new failed requests at the same time. A single-threaded worker then would pick requests from queue and send them to Cassandra. Since it should not do much it's unlikely that it becomes a bottle-neck. This worker can apply it's own rate limits, e.g. based on timing with com.google.common.util.concurrent.RateLimiter
.
What is the best way to implement this queue or buffer feature which can apply particular guava rate limiting as well while writing into Cassandra or if there is any better approach let me know as well? I wanted to write to Cassandra at 2000 request per second (this should be configurable so that I can play with it to see what is optimal setting).
As noted below in the comments, if memory keeps increasing we can use Guava Cache or CLHM to keep dropping old records to make sure my program doesn't run out of memory. We will be having around 12GB of memory on the box and these records are very small so I don't see it should be a problem.