0
votes

I'd like to pass file down to the stream, but in some intermediate stage I'd like to process it by converting it to stream of lines.

I have something like this:

Stream.flow(file1, file2, file3)
.via(processFileFlow)
.via(archiveFlow)

ProcessFileFlow looks like this:

Flow.create().map(file1 -> Files.lines(file1)).map(line -> {send(line); return line;})

I can create Pair of file and file line, to pass file down to the stream, but then archive method is invoked too many times, because file is duplicated as many times as number of lines in this file. I want to run archive flow only once for a file.

What is the best way to do that?

1

1 Answers

0
votes

Have a look at the fold[T](zero: T)(f: (T, Out) ⇒ T) method, which would allow you to reduce your (file, line) to for example a sequence of files again before the archive step is called.