4
votes

Blocking execute fethod from com.datastax.driver.core.Session

public ResultSet execute(Statement statement);

Comment on this method:

This method blocks until at least some result has been received from the database. However, for SELECT queries, it does not guarantee that the result has been received in full. But it does guarantee that some response has been received from the database, and in particular guarantee that if the request is invalid, an exception will be thrown by this method.

Non-blocking execute fethod from com.datastax.driver.core.Session

public ResultSetFuture executeAsync(Statement statement);

This method does not block. It returns as soon as the query has been passed to the underlying network stack. In particular, returning from this method does not guarantee that the query is valid or has even been submitted to a live node. Any exception pertaining to the failure of the query will be thrown when accessing the {@link ResultSetFuture}.

I have 02 questions about them, thus it would be great if you can help me to understand them.

Let's say I have 1 million of records and I want all of them to be arrived in the database (without any lost).

Question 1: If I have n number of threads, all threads will have the same amount of records they need to send to the database. All of them continue sending multiple insert queries to cassandra using blocking execute call. If I increase the value of n, will it also helps to speed up the time that I need to insert all records to cassandra?

Will this cause performance problem for cassandra? Does Cassandra have to make sure that for every single insert record, all the nodes in the clusters should know about the new record immediately? In order to maintain the consistency in data. (I assume cassandra node won't even think about using the local machine time for controlling the record insertion time).

Question 2: With non-blocking execute, how can I assure that all of the insertions is successful? The only way I know is waiting for the ResultSetFuture to check the execution of the insert query. Is there any better way I can do ? Is there a higher chance that non-blocking execute is easier to fail then blocking execute?

Thank you very much for your helps.

2

2 Answers

6
votes

If I have n number of threads, all threads will have the same amount of records they need to send to the database. All of them continue sending multiple insert queries to cassandra using blocking execute call. If I increase the value of n, will it also helps to speed up the time that I need to insert all records to cassandra?

To some extent. Lets divorce the client implementation details a bit and look at things from the perspective of "Number of concurrent requests", as you don't need to have a thread for each ongoing request if you use executeAsync. In my testing I have found that while there is a lot of value in having a high number of concurrent requests, there is a threshold for which there are diminishing returns or performance starts to degrade. My general rule of thumb is (number of Nodes *native_transport_max_threads (default: 128)* 2), but you may find more optimal results with more or less.

The idea here is that there is not much value in enqueuing more requests than cassandra will handle at a time. While reducing the number of inflight requests, you limit unnecessary congestion on the connections between your driver client and cassandra.

Question 2: With non-blocking execute, how can I assure that all of the insertions is successful? The only way I know is waiting for the ResultSetFuture to check the execution of the insert query. Is there any better way I can do ? Is there a higher chance that non-blocking execute is easier to fail then blocking execute?

Waiting on the ResultSetFuture via get is one route, but if you are developing a fully async application, you want to avoid blocking as much as possible. Using guava, your two best weapons are Futures.addCallback and Futures.transform.

  • Futures.addCallback allows you to register a FutureCallback that gets executed when the driver has received the response. onSuccess gets executed in the success case, onFailure otherwise.

  • Futures.transform allows you to effectively map the returned ResultSetFuture into something else. For example if you only want the value of 1 column you could use it to transform ListenableFuture<ResultSet> to a ListenableFuture<String> without having to block in your code on the ResultSetFuture and then getting the String value.

In the context of writing a dataloader program, you could do something like the following:

  1. To keep things simple use a Semaphore or some other construct with a fixed number of permits (that will be your maximum number of inflight requests). Whenever you go to submit a query using executeAsync, acquire a permit. You should really only need 1 thread (but may want to introduce a pool of # cpu cores size that does this) that acquires the permits from the Semaphore and executes queries. It will just block on acquire until there is an available permit.
  2. Use Futures.addCallback for the future returned from executeAsync. The callback should call Sempahore.release() in both onSuccess and onFailure cases. By releasing a permit, this should allow your thread in step 1 to continue and submit the next request.

To further improve throughput, you might want to consider using BatchStatement and submitting requests in batches. This is a good option if you keep your batches small (50-250 is a good number) and if your inserts in a batch all share the same partition key.

0
votes

Besides the above answer,

Looks like execute() calls executeAsync(statement).getUninterruptibly(), so whether you manage your own "n thread pool" using execute() and block yourself until execution completes up to a max of n running threads OR using executeAsync() on all records, cassandra side performance should be roughly same, depending on execution time/count + timeouts.

They executions will all run connections borrowed from a pool, each execution has a streamId on client side and gets notified you via future when the response comes back for this streamId, limited by total requests per connection on client side and total requests limited by read threads on each node that was picked to execute your request, any higher number will be buffered in a queue (not blocked) limited by the connection maxQueueSize and maxRequestsPerConnection, any higher than this should fail. The beauty of this is that executeAsync() does not run on a new thread per request/execution.

So, there has to be a limit on how many requests can run via execute() or executeAsync(), in execute() you are avoiding beyond these limits.

Performance wise, you will start seeing a penalty beyond what each node can handle so execute() with a good size pool makes sense to me. Even better, use a reactive architecture to avoid creating so many threads that are doing nothing but waiting, so large number of threads will cause wasted context switching on client side. For smaller number of requests, executeAsync() will be better by avoiding thread pools.

DefaultResultSetFuture future = new DefaultResultSetFuture(..., makeRequestMessage(statement, null));
new RequestHandler(this, future, statement).sendRequest();