1
votes

This is a beginner's question. I am streaming back the output of several processes back to client, and I want to store each stream to a separate file.

So what I want is (large letter indicates the end of the nested stream)

S: -a-a-a-b-b-a-A-c-b-B-c-c-...-d--C...D-e--...-z-E-z--z--...

R:  a-a-a-----a-A (complete)
          b-b-------b-B (complete)
                  c-----c-c--------C (complete)
                                d------D (complete)
                                         e--------E (complete)

                     .
                     .
                     . 
       (end many more nested streams coming)
                     .

So I want something like a dynamic factory of Observables. Similar to using(), but as I understand using() creates Observables that exist as long as the original Observable, whereas I want to complete and close file every time nested stream completes.

IMPORTANT - I don't want to buffer in memory, as these are very long streams (output of long running processes). So I would like to avoid buffer(), groupBy().

1

1 Answers

1
votes

If you can determine from a value if it is the final for a particular letter, you can use groupBy with takeUntil in the inner groups (you may need a few try-catches in the lambdas):

source
.groupBy(v -> v.type)
.flatMap(g -> 
     Observable.using(
         () -> createOutput(g.getKey()), 
         f -> g.takeUntil(v -> v.isEnd).doOnNext(v -> f.write(v))),
         f -> f.close()
     )
)
.subscribe(...)

TakeUntil makes sure groupBy only keeps sub-streams as long as we expect values of it (and if the source is properly ordered, groupBy won't recreate the group).

If you really want to avoid groupBy, you have to manually track each open file and close them at the appropriate times:

Observable.using(
    HashMap::new,
    map -> source
       .doOnNext(v -> {
           Output o = map.get(v.type);
           if (o == null) {
               o = new Output(v.type);
               map.put(v.type, o);
           }
           o.write(v);
           if (v.isEnd) {
               o.close();
               map.remove(v.type);
           }
       }),
    map -> map.values().forEach(e -> e.close())
)
.subscribe(...);