I am querying Dynamo DB for a given primary key. Primary Key consists of two UUID fields (fieldUUID1, fieldUUID2). I have a lot of queries to be executed for the above primary key combination with list of values. For which i am using Asynchronous CompleteableFuture with ExecutorService with a thread pool of size 4.
After all the queries return results, which is CompletableFuture<Object>, i join them using allOf method of completable future which ensures that all the query execution is complete, and it gives me CompletableFuture<void>, on which using stream i receive CompletableFuture<List<Object>>
If some of the queries result in pagination of result, i.e. returns lastEvaluatedKey, there is no way for me to know which Query Request returned this.
if i do a .get() call while i received `CompletableFuture, this will be a blocking operation, which defeats the purpose of using asynchronous. Is there a way i can handle this scenario?
example:
I can try thenCompose method, but how do i know at what point i need to stop when lastEvaluatedKey is absent.
for (final QueryRequest queryRequest : queryRequests) {
final CompletableFuture<QueryResult> futureResult =
CompletableFuture.supplyAsync(() ->
dynamoDBClient.query(queryRequest), executorService));
if (futureResult == null) {
continue;
}
futures.add(futureResult);
}
// Wait for completion of all of the Futures provided
final CompletableFuture<Void> allfuture = CompletableFuture
.allOf(futures.toArray(new CompletableFuture[futures.size()]));
// The return type of the CompletableFuture.allOf() is a
// CompletableFuture<Void>. The limitation of this method is that it does not
// return the combined results of all Futures. Instead we have to manually get
// results from Futures. CompletableFuture.join() method and Java 8 Streams API
// makes it simple:
final CompletableFuture<List<QueryResult>> allFutureList = allfuture.thenApply(val -> {
return futures.stream().map(f -> f.join()).collect(Collectors.toList());
});
final List<QueryOutcome> completableResults = new ArrayList<>();
try {
try {
// at this point all the Futures should be done, because we already executed
// CompletableFuture.allOf method.
final List<QueryResult> returnedResult = allFutureList.get();
for (final QueryResult queryResult : returnedResult) {
if (MapUtils.isNotEmpty(queryResult.getLastEvaluatedKey()) {
// how to get hold of original request and include last evaluated key ?
}
}
} finally {
}
} finally {
}
I can rely on .get() method, but it will be a blocking call.
futureResult == null,supplyAsync()will never returnnull. - Didier L