1
votes

I am trying to read an Excel file in manipulate it or add new data to it and write it back out. I am also trying to do this a complete reactive process using Flux and Mono. The Idea is to return the resulting file or bytearray via a webservice.

My question is how do I get a InputStream and OutputStream in a non blocking way?

I am using the Apache Poi library to read and generate the Excel File.

I currently have a solution based around a mix of Mono.fromCallable() and Blocking code getting the Input Stream.

For example the webservice part is as follows.

@GetMapping(value = API_BASE_PATH + "/download", produces = "application/vnd.ms-excel")
public Mono<ByteArrayResource> download() {
    Flux<TimeKeepingEntry> createExcel = excelExport.createDocument(false);

    return createExcel.then(Mono.fromCallable(() -> {
        ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
        excelExport.getWb().write(outputStream);
        return new ByteArrayResource(outputStream.toByteArray());
    }).subscribeOn(Schedulers.elastic()));
}

And the Processing of the file:

public Flux<TimeKeepingEntry> createDocument(boolean all) {
    Flux<TimeKeepingEntry> entries = null;
    try {
        InputStream inputStream = new ClassPathResource("Timesheet Template.xlsx").getInputStream();
        wb = WorkbookFactory.create(inputStream);
        Sheet sheet = wb.getSheetAt(0);

        log.info("Created document");

        if (all) {
            //all entries
        } else {
            entries = service.findByMonth(currentMonthName).log("Excel Export - retrievedMonths").sort(Comparator.comparing(TimeKeepingEntry::getDateOfMonth)).doOnNext(timeKeepingEntry-> {
                this.populateEntry(sheet, timeKeepingEntry);
            });
        }
    } catch (IOException e) {
        log.error("Error Importing File", e);
    }
    return entries;
}

This works well enough but not very in line with Flux and Mono. Some guidance here would be good. I would prefer to have the whole sequence non-blocking.

1

1 Answers

0
votes

Unfortunately the WorkbookFactory.create() operation is blocking, so you have to perform that operation using imperative code. However fetching each timeKeepingEntry can be done reactively. Your code would looks something like this:

public Flux<TimeKeepingEntry> createDocument() {
    return Flux.generate(
        this::getWorkbookSheet,
        (sheet, sink) -> {
            sink.next(getNextTimeKeepingEntryFrom(sheet));
        },
        this::closeWorkbook);
}

This will keep the workbook in memory, but will fetch each entry on demand when the elements of the Flux are requested.