16
votes

I am using datastax java driver 3.1.0 to connect to cassandra cluster and my cassandra cluster version is 2.0.10. I am writing asynchronously with QUORUM consistency.

  private final ExecutorService executorService = Executors.newFixedThreadPool(10);

  public void save(String process, int clientid, long deviceid) {
    String sql = "insert into storage (process, clientid, deviceid) values (?, ?, ?)";
    try {
      BoundStatement bs = CacheStatement.getInstance().getStatement(sql);
      bs.setConsistencyLevel(ConsistencyLevel.QUORUM);
      bs.setString(0, process);
      bs.setInt(1, clientid);
      bs.setLong(2, deviceid);

      ResultSetFuture future = session.executeAsync(bs);
      Futures.addCallback(future, new FutureCallback<ResultSet>() {
        @Override
        public void onSuccess(ResultSet result) {
          logger.logInfo("successfully written");
        }

        @Override
        public void onFailure(Throwable t) {
          logger.logError("error= ", t);
        }
      }, executorService);
    } catch (Exception ex) {
      logger.logError("error= ", ex);
    }
  }

My above save method will be called from multiple threads at very fast speed.

Question:

I want to throttle the request to executeAsync method which writes asynchronously into Cassandra. If I write at very high speed than my Cassandra cluster can handle then it will start throwing errors and I want all my writes should go successfully into cassandra without any loss.

I saw this post where solution is to use Semaphore with fixed number of permits. But I am not sure how and what is the best way to implement that. I have never used Semaphor before. This is the logic. Can anyone provide an example with Semaphore basis on my code or if there is any better way/option, then let me know as well.

In the context of writing a dataloader program, you could do something like the following:

  • To keep things simple use a Semaphore or some other construct with a fixed number of permits (that will be your maximum number of inflight requests). Whenever you go to submit a query using executeAsync, acquire a permit. You should really only need 1 thread (but may want to introduce a pool of # cpu cores size that does this) that acquires the permits from the Semaphore and executes queries. It will just block on acquire until there is an available permit.
  • Use Futures.addCallback for the future returned from executeAsync. The callback should call Sempahore.release() in both onSuccess and onFailure cases. By releasing a permit, this should allow your thread in step 1 to continue and submit the next request.

Also I have seen couple of other post where they have talked about using RingBuffer or Guava RateLimitter so which one is better and I should be using? Below are the options I can think of:

  • Using Semaphore
  • Using Ring Buffer
  • Using Guava Rate Limiter

Can anyone help me with an example of how we can throttle the request or get backpressure for cassandra writes and making sure all writes goes successfully into cassandra?

2

2 Answers

9
votes

Not an authoritative answer but maybe it would be helpful. First you should consider what would you do when query cannot be executed right away. No matter which rate limiting you chose if you get requests at higher rate than you can write to Cassandra eventually you'll get your process clogged with waiting requests. And at that moment you would need to tell your clients to hold their requests for a while ("push back"). E.g. if they are coming via HTTP then response status would be 429 "Too Many Requests". If you generate requests in same process then decide what longest timeout is acceptable. That said if Cassandra cannot keep up then it's time to scale (or tune) it.

Maybe before implementing rate limits it's worth to experiment and add artificial delays in your threads before call to save method (using Thread.sleep(...)) and see whether it solves your problem or something else is needed.

Query returning error is backpressure from Cassandra. But you may choose or implement RetryPolicy to determine when to retry failed queries.

Also you may look at connection pool options (and especially Monitoring and tuning the pool). One can tune number of asynchronous requests per connection. However documentation says that for Cassandra 2.x this parameter caps to 128 and one should not change it (I'd experiment with it though :)

Implementation with Semaphore looks like

/* Share it among all threads or associate with a thread for per-thread limits
   Number of permits is to be tuned depending on acceptable load.
*/
final Semaphore queryPermits = new Semaphore(20); 


public void save(String process, int clientid, long deviceid) {
  ....
  queryPermits.acquire(); // Blocks until a permit is available

  ResultSetFuture future = session.executeAsync(bs);
  Futures.addCallback(future, new FutureCallback<ResultSet>() {
    @Override
    public void onSuccess(ResultSet result) {
      queryPermits.release();
      logger.logInfo("successfully written");
    }
    @Override
    public void onFailure(Throwable t) {
      queryPermits.release(); // Permit should be released in all cases.
      logger.logError("error= ", t);
    }
  }, executorService);
  ....
}

(In real code I'd create a wrapper callback that would release permits and then calls wrapped methods)

Guava's RateLimiter is similar to semaphore but allows temporary bursts after underutilization periods and limits requests based on timing (not total number of active queries).

However requests will fail for various reasons anyway so probably it's better to have a plan how to retry them (in case of intermittent errors).

It might not be appropriate in your case but I'd try to use some queue or buffer to enqueue requests (e.g. java.util.concurrent.ArrayBlockingQueue). "Buffer full" would mean that clients should wait or give up the request. Buffer would also be used to re-enqueue failed requests. However to be more fair failed requests probably should be put to front of queue so they are retried first. Also one should somehow handle situation when queue is full and there are new failed requests at the same time. A single-threaded worker then would pick requests form queue and send them to Cassandra. Since it should not do much it's unlikely that it becomes a bottle-neck. This worker can also apply it's own rate limits, e.g. based on timing with com.google.common.util.concurrent.RateLimiter.

If one would want to avoid losing messages as much as possible he can put a message broker with persistence (e.g. Kafka) in front of Cassandra. This way incoming messages can survive even long outages of Cassandra. But, I guess, it's overkill in your case.

2
votes

Simply using a blocking queue should do it fine. Futures are threaded, and there callback (success and failure) will act as consumer, and wherever you call save method from will act as producer.

Even better way will be, you put complete request itself in queue, and dequeue it one by one firing save on each dequeue.

private final ExecutorService executorService = Executors.newFixedThreadPool(10);

public void save(String process, int clientid, long deviceid, BlockingQueue<Object> queue) {
    String sql = "insert into storage (process, clientid, deviceid) values (?, ?, ?)";
    try {
      BoundStatement bs = CacheStatement.getInstance().getStatement(sql);
      bs.setConsistencyLevel(ConsistencyLevel.QUORUM);
      bs.setString(0, process);
      bs.setInt(1, clientid);
      bs.setLong(2, deviceid);

      ResultSetFuture future = session.executeAsync(bs);
      Futures.addCallback(future, new FutureCallback<ResultSet>() {
        @Override
        public void onSuccess(ResultSet result) {
          logger.logInfo("successfully written");
          queue.take();
        }

        @Override
        public void onFailure(Throwable t) {
          logger.logError("error= ", t);
          queue.take();
        }
      }, executorService);
    } catch (Exception ex) {
      logger.logError("error= ", ex);
    }
}

public void invokeSaveInLoop(){
    Object dummyObj = new Object();
    BlockingQueue<Object> queue = new ArrayBlockingQueue<>(20);;
    for(int i=0; i< 1000; i++){
        save("process", clientid, deviceid, queue);
        queue.put(dummyObj);
    }
}

If you want to go further and check load on cluster mid way

public static String getCurrentState(){    
StringBuilder response = new StringBuilder();
            response.append("Current Database Connection Status <br>\n ---------------------------------------------<br>\n");
            final LoadBalancingPolicy loadBalancingPolicy =
                    cluster.getConfiguration().getPolicies().getLoadBalancingPolicy();
            final PoolingOptions poolingOptions =
                    cluster.getConfiguration().getPoolingOptions();
            Session.State state = session.getState();
            for (Host host : state.getConnectedHosts()) {
                HostDistance distance = loadBalancingPolicy.distance(host);
                int connections = state.getOpenConnections(host);
                int inFlightQueries = state.getInFlightQueries(host);
                response.append(String.format("%s current connections=%d, max allowed connections=%d, current load=%d, max load=%d%n",
                                host, connections, poolingOptions.getMaxConnectionsPerHost(distance), inFlightQueries,
                                connections *
                                        poolingOptions.getMaxRequestsPerConnection(distance)))
                        .append("<br>\n");
            }
            return response.toString();
}