0
votes

I am querying Dynamo DB for a given primary key. Primary Key consists of two UUID fields (fieldUUID1, fieldUUID2). I have a lot of queries to be executed for the above primary key combination with list of values. For which i am using Asynchronous CompleteableFuture with ExecutorService with a thread pool of size 4.

After all the queries return results, which is CompletableFuture<Object>, i join them using allOf method of completable future which ensures that all the query execution is complete, and it gives me CompletableFuture<void>, on which using stream i receive CompletableFuture<List<Object>>

If some of the queries result in pagination of result, i.e. returns lastEvaluatedKey, there is no way for me to know which Query Request returned this.

if i do a .get() call while i received `CompletableFuture, this will be a blocking operation, which defeats the purpose of using asynchronous. Is there a way i can handle this scenario?

example:

I can try thenCompose method, but how do i know at what point i need to stop when lastEvaluatedKey is absent.

for (final QueryRequest queryRequest : queryRequests) {
    final CompletableFuture<QueryResult> futureResult =
        CompletableFuture.supplyAsync(() ->
            dynamoDBClient.query(queryRequest), executorService));

    if (futureResult == null) {
        continue;
    }

    futures.add(futureResult);
}

// Wait for completion of all of the Futures provided
final CompletableFuture<Void> allfuture = CompletableFuture
    .allOf(futures.toArray(new CompletableFuture[futures.size()]));

// The return type of the CompletableFuture.allOf() is a
// CompletableFuture<Void>. The limitation of this method is that it does not
// return the combined results of all Futures. Instead we have to manually get
// results from Futures. CompletableFuture.join() method and Java 8 Streams API
// makes it simple:
final CompletableFuture<List<QueryResult>> allFutureList = allfuture.thenApply(val -> {
    return futures.stream().map(f -> f.join()).collect(Collectors.toList());
});


final List<QueryOutcome> completableResults = new ArrayList<>();
try {
    try {
        // at this point all the Futures should be done, because we already executed
        // CompletableFuture.allOf method.
        final List<QueryResult> returnedResult = allFutureList.get();
        for (final QueryResult queryResult : returnedResult) {
            if (MapUtils.isNotEmpty(queryResult.getLastEvaluatedKey()) {
                // how to get hold of original request  and include last evaluated key ?
            }
        }
    } finally {

    }
} finally {

}

I can rely on .get() method, but it will be a blocking call.

1
Why is it a problem to use a blocking call? All the previous fetching operations are already performed asynchronously in parallel, right? What should your thread be doing instead? As a side note, you don't need to check futureResult == null, supplyAsync() will never return null. - Didier L
Thanks for your response. Yes, checking (futureResult == null) is completely avoidable, Thanks for the suggestion, Didier. Don't you think if the future hasn't returned the result yet, another queries which have returned result can utilise this thread rather than waiting for .get() to complete? Because there are 1000+ queries running, and with a executor pool of 4 would be contagious, isn't it ? - Manish Gautam

1 Answers

0
votes

the quick solution to your need is to change your futures list. Instead of having it store CompletableFuture<QueryResult> you can change to store CompletableFuture<RequestAndResult> where RequestAndResult is a simple data class holding a QueryRequest and a QueryResult. To do that you need to change your first loop.

Then, once the allfuture completes you can iterate over futures and get access to both the requests and the results.

However, there is a deeper issue here. What are you planning to do once you have access to the origianl QueryRequest? my guess is that you want to issue a followup request with exclusiveStartKey set to whatever the response's lastEvaluatedKey holds. This means that you will wait for all original queries to complete and only then you'll issue the next bunch. This is inefficient: if a query returned with a lastEvaluatedKey you want to issue its followup query ASAP.

To achieve this my advise to you is to introduce a new method which takes a single QueryRequest object and returns a CompletableFuture<QueryResult>. Its implementation will be roughly as follows:

  • issue a query with the given request
  • once the result arrives check it. if its lastEvaluatedKey is empty return it as the result of the method
  • otherwise, update request.exclusiveStartKey and go back to the first step.

Yes, its a bit harder to do that with CompletableFutures (compared to blocking code) but is totally doable.

Once you have that method your code needs to call this method once for each of the requests in queryRequests, put the returned CompletableFutures in a list, and do a CompletableFuture.allOf() on that list. Once the allOf future completes you can just use the results - no need to do issue followup queries.