2
votes

I am writing a function that creates multiple (7) CompletableFutures. Each of these futures basically does two things :

  1. using supplyAsync(), fetch data from some DB
  2. using thenAccept(), write this data to a CSV file

When all the 7 futures have finished the job, I want to continue with further code execution. So, I am using allOf() and then calling a join() on the Void CompletableFuture returned by allOf().

The problem is, even after all futures have executed (I can see the CSVs getting generated), the join() call remains stuck and further code execution is blocked forever.

I have tried the following things :

  1. Waiting on each future one by one calling a join() after each future. This works but, at the cost of concurrency. I don't want to do this.

  2. Tried using get() with a TIMEOUT instead of join(). But, this always ends up throwing an exception (as get always times out) which is undesirable.

  3. Saw this JDK bug : https://bugs.openjdk.java.net/browse/JDK-8200347 . Not sure if this is a similar issue.

  4. Tried running without a join() or get() which will not hold the thread execution and again is not desirable.

The main function which creates all futures.

public CustomResponse process() {
        CustomResponse msgResponse = new CustomResponse();
        try {
            // 1. DbCall 1
            CompletableFuture<Void> f1 = dataHelper.fetchAndUploadCSV1();

            // 2. DbCall 2
            CompletableFuture<Void> f2 = dataHelper.fetchAndUploadCSV2();


            // 3. DbCall 3
            CompletableFuture<Void> f3 = dataHelper.fetchAndUploadCSV3();


            // 4. DbCall 4
            CompletableFuture<Void> f4 = dataHelper.fetchAndUploadCSV4();


            // 5. DbCall 5
            CompletableFuture<Void> f5 = dataHelper.fetchAndUploadCSV5();


            // 6. DbCall 6
            CompletableFuture<Void> f6 = dataHelper.fetchAndUploadCSV6();


            // 7. DbCall 7
            CompletableFuture<Void> f7 = dataHelper.fetchAndUploadCSV7();


            CompletableFuture<Void>[] fAll = new CompletableFuture[] {f1, f2, f3, f4, f5, f6, f7};


            CompletableFuture.allOf(fAll).join();
            msgResponse.setProcessed(true);
            msgResponse.setMessageStatus("message");
        } catch (Exception e) {
    msgResponse.setMessageStatus(ERROR);
            msgResponse.setErrorMessage("error");
        }
        return msgResponse;
    }

Each of the fetchAndUploadCSV() functions looks like this :

public CompletableFuture<Void> fetchAndUploadCSV1() {
        return CompletableFuture.supplyAsync(() -> {
            try {
                return someService().getAllData1();
            } catch (Exception e) {
                throw new RuntimeException(e);
            }
        }).thenAccept(results -> {
            try {
                if (results.size() > 0) {
                    csvWriter.uploadAsCsv(results);
                }
                else {
                    log.info(" No data found..");
                }
            } catch (Exception e) {
                throw new RuntimeException(e);
            }
        });
    }

And this is what csvWriter.uploadAsCsv(results) looks like -

public <T> void uploadAsCsv(List<T> objectList) throws Exception {
        long objListSize = ((objectList==null) ? 0 : objectList.size());
        log.info("Action=Start, objectListSize=" + objListSize);
        ByteArrayInputStream inputStream = getCsvAsInputStream(objectList);
        Info fileInfo = someClient.uploadFile(inputStream);
        log.info("Action=Done, FileInfo=" + ((fileInfo==null ? null : fileInfo.getID())));
    }

I am using OpenCSV here to convert the data to CSV stream. And I can always see the last log line.

Expected Results : All data fetched, CSVs generated and CustomResponse should return as processed with no error message.

Actual Results : All data fetched, CSVs generated and main thread hung.

1
What Java version are you running this on? Have you tried with the most recent version of Java 8, or Java 10+, as those were indicated to work in the linked bug? - Kayaman
I am running Java 8 (u201-b09). Latest Java 8 is u211. I will try that. Haven't tried JDK 10. I won't be able to upgrade JDK in deployment anyway. - anand
Well the vague status of the bug could suggest it's related. If you can reliably reproduce it, maybe file a new ticket. - Kayaman
Yes, I have been trying to reproduce it without all the business logic and DB calls. Adding waits and sleeps. Unsuccessful so far. - anand
The use of CompletableFuture seems to be correct to me. When I tested it everything seemed to work. Are you shure that csvWriter.uploadAsCsv(results); works correctly and is not blocking? Maybe there is a problem that occures after the files are created that blocks the execution. - Tobias

1 Answers

1
votes

You can use join on each created CompletableFuture without sacrificing concurrency:

public CustomResponse process() {
    CustomResponse msgResponse = new CustomResponse();

    List<CompletableFuture<Void>> futures = Arrays.asList(dataHelper.fetchAndUploadCSV1(),
            dataHelper.fetchAndUploadCSV2(),
            dataHelper.fetchAndUploadCSV3(),
            dataHelper.fetchAndUploadCSV4(),
            dataHelper.fetchAndUploadCSV5(),
            dataHelper.fetchAndUploadCSV6(),
            dataHelper.fetchAndUploadCSV7());

    return CompletableFuture.allOf(futures.toArray(new CompletableFuture<?>[0]))
            .thenApply(v -> {
                msgResponse.setProcessed(true);
                msgResponse.setMessageStatus("message");
                return msgResponse;
            })
            .exceptionally(throwable -> {
                msgResponse.setMessageStatus("ERROR");
                msgResponse.setErrorMessage("error");
                return msgResponse;
            }).join();
}

allOf returns a new CompletableFuture that is completed when all of the given CompletableFutures complete. So, when join is invoked in thenApply, it returns immediately. In essence, joining is happening to already completed futures. This way blocking is eliminated. Also, to handle possible exceptions, exceptionally should be invoked.