My colleagues and I are running into a frequent challenge and I am hoping reactive programming can solve it. It probably will require my own implementation of Operator or Transformer though.
I want to take any Observable<T> emitting T items, but I want an Operator to group them on a mapping of T and emit each grouping as a List<T>, or even better some generic accumulator much like the Collector from Java 8 streams.
But here is the tricky part that I don't think a groupBy() can do. I want to take two Observables passing through this Operator, and assume the emitted items are sorted on that property (the incoming data would be emitted from a sorted SQL query and mapped to T object). The operator will serially accumulate the items until the property changes, and then it emits that group and moves on to the next. That way I can take each group of data from each Observable, zip and process those two chunks, then throw them away and move on to the next. This way I can maintain a semi-buffered state and keep my memory usage low.
So if I was sorting, grouping, and zipping on PARTITION_ID, this is visually what I'm trying to accomplish.

I'm only doing this because I could have two queries, each over a million records, and I need to do complex comparisons side by side. I don't have the memory to import all data from both sides at once, but I can scope it down to each sorted property value and break it up into batches. After each batch, the GC will discard it and the Operator can move on to the next one.
This is the code I have so far, but I'm kind of unclear how to proceed as I do not want to emit anything until a batch is completed. How exactly do I do this?
public final class SortedPartitioner<T,P,C,R> implements Transformer<T,R> {
private final Function<T,P> mappedPartitionProperty;
private final Supplier<C> acculatorSupplier;
private final BiConsumer<T,R> accumulator;
private final Function<C,R> finalResult;
private SortedPartitioner(Function<T, P> mappedPartitionProperty, Supplier<C> acculatorSupplier,
BiConsumer<T, R> accumulator, Function<C, R> finalResult) {
this.mappedPartitionProperty = mappedPartitionProperty;
this.acculatorSupplier = acculatorSupplier;
this.accumulator = accumulator;
this.finalResult = finalResult;
}
public static <T,P,C,R> SortedPartitioner<T,P,C,R> of(
Function<T,P> mappedPartitionProperty,
Supplier<C> accumulatorSupplier,
BiConsumer<T,R> accumulator,
Function<C,R> finalResult) {
return new SortedPartitioner<>(mappedPartitionProperty, accumulatorSupplier, accumulator, finalResult);
}
@Override
public Observable<R> call(Observable<T> t) {
return null;
}
}