I'd like to pass file down to the stream, but in some intermediate stage I'd like to process it by converting it to stream of lines.
I have something like this:
Stream.flow(file1, file2, file3)
.via(processFileFlow)
.via(archiveFlow)
ProcessFileFlow looks like this:
Flow.create().map(file1 -> Files.lines(file1)).map(line -> {send(line); return line;})
I can create Pair of file and file line, to pass file down to the stream, but then archive method is invoked too many times, because file is duplicated as many times as number of lines in this file. I want to run archive flow only once for a file.
What is the best way to do that?