0
votes

I am trying to create a javadsl Flow which processes Path elements. It should emit the lines of content of the file associated with the Path element. In other words, I think I need to flatten the stream of Path elements to a stream of String elements.

A Flow does have a flatten method but it involves using a FlattenStrategy and I am unsure on how to use it for my case.

final Flow<Path, String, BoxedUnit> toFileLines = Flow
    .<Path>create()
    .flatten(FlattenStrategy.

Any help is greatly appreciated!

Edit1: So it has come to my understanding that it might be a good idea to read the file with a StreamReader and emit a new String whenever the reader reaches a '\n'. So the question now is how to emit multiple elements from a single transformation method. Like so

final Flow<Path, String, BoxedUnit> toFileLines = Flow
    .<Path>create()
    .mapAsync(
            //create streamreader
            //while streamreader has.next
                //read line until \n
                //emit line
    );

Is this possible?

1
Try to think of it as a multistep process: 0.) you get a stream of Paths as input 1.) stream the content of each file 2.) split each file into lines 3.) concatenate all the lines from all the files. The FlattenStrategy will be used only for the last step. It specifies how content from the incoming sources should be "flattened", i.e. if they should be concatenated or merged (which isn't yet implemented).jrudolph
@jrudolph Thanks for your reply. I am interested in step 1 and 2 of your summary. How should I stream the contents (String lines) of a file in a neat and preferably parallel manner? The files can reach big sizes and I would like to read them lazily instead of the whole file at onceJHeut
Could it be possible to use the Java8 streams implementation of Files.lines for example?JHeut
You can do 1.) using a SynchronousFileSource. 2.) will be much easier with the upcoming RC4 which will include components for splitting string streams: github.com/akka/akka/pull/17446/filesjrudolph

1 Answers

0
votes

I suggest using Flow.flatMapConcat. First you have to write a Source that will produce lines from a file, and then flatMapConcat those Sources together. There is also a bit of ByteString parsing using Framing which I got from the docs:

import java.io.File;
import akka.stream.javadsl.FileIO;
import akka.stream.javadsl.Flow;
import akka.stream.io.Framing;
import akka.util.ByteString;

int maxLineSize = 1024;

final ByteString delim = ByteString.fromString("\r\n");

final Flow<String, String, BoxedUnit> pathsToContents = 
  Flow.of(String.class)
      .flatMapConcat(path -> FileIO.fromFile(new File(path))
                                   .via(Framing.delimiter(delim, maxLineSize,true))
                                   .map(byteStr -> byteStr.utf8String()));