7
votes

I am using datastax java driver 3.1.0 to connect to cassandra cluster and my cassandra cluster version is 2.0.10. I am writing asynchronously with QUORUM consistency.

  private final ExecutorService executorService = Executors.newFixedThreadPool(10);
  private final Semaphore concurrentQueries = new Semaphore(1000);

  public void save(String process, int clientid, long deviceid) {
    String sql = "insert into storage (process, clientid, deviceid) values (?, ?, ?)";
    try {
      BoundStatement bs = CacheStatement.getInstance().getStatement(sql);
      bs.setConsistencyLevel(ConsistencyLevel.QUORUM);
      bs.setString(0, process);
      bs.setInt(1, clientid);
      bs.setLong(2, deviceid);

      concurrentQueries.acquire();
      ResultSetFuture future = session.executeAsync(bs);
      Futures.addCallback(future, new FutureCallback<ResultSet>() {
        @Override
        public void onSuccess(ResultSet result) {
          concurrentQueries.release();
          logger.logInfo("successfully written");
        }

        @Override
        public void onFailure(Throwable t) {
          concurrentQueries.release();
          logger.logError("error= ", t);
        }
      }, executorService);
    } catch (Exception ex) {
      logger.logError("error= ", ex);
    }
  }

My above save method will be called from multiple threads at very fast speed. If I write at very high speed than my Cassandra cluster can handle then it will start throwing errors and I want all my writes should go successfully into cassandra without any loss.

Question:

I was thinking to use some sort off queue or buffer to enqueue requests (e.g. java.util.concurrent.ArrayBlockingQueue). "Buffer full" would mean that clients should wait. Buffer would also be used to re-enqueue failed requests. However to be more fair failed requests probably should be put to front of queue so they are retried first. Also we should somehow handle situation when queue is full and there are new failed requests at the same time. A single-threaded worker then would pick requests from queue and send them to Cassandra. Since it should not do much it's unlikely that it becomes a bottle-neck. This worker can apply it's own rate limits, e.g. based on timing with com.google.common.util.concurrent.RateLimiter.

What is the best way to implement this queue or buffer feature which can apply particular guava rate limiting as well while writing into Cassandra or if there is any better approach let me know as well? I wanted to write to Cassandra at 2000 request per second (this should be configurable so that I can play with it to see what is optimal setting).

As noted below in the comments, if memory keeps increasing we can use Guava Cache or CLHM to keep dropping old records to make sure my program doesn't run out of memory. We will be having around 12GB of memory on the box and these records are very small so I don't see it should be a problem.

1
Could you provide some info about the instances and cluster that you are using plus the table create statement + descibe a bit about access pattern for this one. What replication factor are you using. Usually writes for cassandra are relly fast, even on very modest cluster you can go well beyond 2000 req/s. Can you also check if the statement is really prepared and that for some reason the client isn't preparing the statement every time? What is the speed at which the data would come in without implemented buffering. My gut feeling is your cassandra cluster might need to scale up/out a bitMarko Švaljek
We have three nodes in each datacenter with replication factor of 3. On this table we will be writing at a very high speed and later on we will be reading it for some offline analysis. Yes I am caching prepared statement once and then reusing that prepared statement. These cassandra cluster setup is not in my control as some other team in our company manages that so I wanted to make sure atleast my code doesn't fail and we are able to write everything.john
We could have used some other database but since we are using this database for some other purpose we decided to use this for this purpose as well. Writing at a very high speed make sure you don't loose data and then later on read these records for some offline comparison. I just wanted to see how this queuing thing will work out as compared to normal once. I also wanted to implement this just to get the understanding how we will implement this in an efficient way.john
I understand now. The thing is even if you use your memory as a buffer at some point you might run out of it under the load so even if you could limit the rates it might be a bad idea actually. Have you considered putting this messages into some sort of queue like kafka or sqs and then just by some simple app/process pull the messages out and push them to cassandra with a rate that you can control easily? This pattern works out very nicely. Managing all this in memory might get you into even bigger trouble if cassandra nodes die. And with a physical queue in between you are much safer.Marko Švaljek
and I feel your pain :( basically all it woud take is those guys add couple of more instances and everything is fine.Marko Švaljek

1 Answers

2
votes

If I write at very high speed than my Cassandra cluster can handle then it will start throwing errors and I want all my writes should go successfully into cassandra without any loss.

Datastax driver allows to configure number of connections per host and number of concurrent requests per connection (see PoolingOptions settings)

Adjust these settings to decrease pressure on Cassandra cluster.