4
votes

My colleagues and I are running into a frequent challenge and I am hoping reactive programming can solve it. It probably will require my own implementation of Operator or Transformer though.

I want to take any Observable<T> emitting T items, but I want an Operator to group them on a mapping of T and emit each grouping as a List<T>, or even better some generic accumulator much like the Collector from Java 8 streams.

But here is the tricky part that I don't think a groupBy() can do. I want to take two Observables passing through this Operator, and assume the emitted items are sorted on that property (the incoming data would be emitted from a sorted SQL query and mapped to T object). The operator will serially accumulate the items until the property changes, and then it emits that group and moves on to the next. That way I can take each group of data from each Observable, zip and process those two chunks, then throw them away and move on to the next. This way I can maintain a semi-buffered state and keep my memory usage low.

So if I was sorting, grouping, and zipping on PARTITION_ID, this is visually what I'm trying to accomplish.

enter image description here

I'm only doing this because I could have two queries, each over a million records, and I need to do complex comparisons side by side. I don't have the memory to import all data from both sides at once, but I can scope it down to each sorted property value and break it up into batches. After each batch, the GC will discard it and the Operator can move on to the next one.

This is the code I have so far, but I'm kind of unclear how to proceed as I do not want to emit anything until a batch is completed. How exactly do I do this?

public final class  SortedPartitioner<T,P,C,R> implements Transformer<T,R> {

 private final Function<T,P> mappedPartitionProperty;
 private final Supplier<C> acculatorSupplier;
 private final BiConsumer<T,R> accumulator;
 private final Function<C,R> finalResult;


 private SortedPartitioner(Function<T, P> mappedPartitionProperty, Supplier<C> acculatorSupplier,
   BiConsumer<T, R> accumulator, Function<C, R> finalResult) {
  this.mappedPartitionProperty = mappedPartitionProperty;
  this.acculatorSupplier = acculatorSupplier;
  this.accumulator = accumulator;
  this.finalResult = finalResult;
 }
 public static <T,P,C,R> SortedPartitioner<T,P,C,R> of(
   Function<T,P> mappedPartitionProperty, 
   Supplier<C> accumulatorSupplier,
   BiConsumer<T,R> accumulator,
   Function<C,R> finalResult) {

  return new SortedPartitioner<>(mappedPartitionProperty, accumulatorSupplier, accumulator, finalResult);

 }
 @Override
 public Observable<R> call(Observable<T> t) {
  return null;
 }

}
2

2 Answers

1
votes

Another answer for you that uses a library on Maven Central and is way shorter.

Add this dependency to your pom.xml.

<dependency>
    <groupId>com.github.davidmoten</groupId>
    <artifactId>rxjava-extras</artifactId>
    <version>0.5.13</version>
</dependency>

In terms of grouping the items with the same partition_id do this:

import com.github.davidmoten.rx.Transformers;

Observable<List<Item>> grouped = items.compose(
    Transformers.toListWhile(
        (list, item) -> list.isEmpty() || list.get(0).partitionId == item.partitionId));

Testing is pretty comprehensive for this method (see also Transformers.collectWhile for data structures other than lists) but you can check the source for yourself on github.

Then proceed with the zip.

1
votes

This is a tricky one but one I have also come across frequently.

The trick is to use materialize, scan and flatMap. scan accumulates a list of values with same partitionId and the next different value if it exists. materialize is needed because we need to know when the source completes so we can emit the left over different value if it exists. The flatMap takes the list and value and emits the list if the value is present (we have just switched to a new partitionId) and emits the value (the leftover) if the stream completes.

Below is a unit test that from the list 1, 1, 2, 2, 2, 3 emits lists {1, 1}, {2, 2, 2}, {3}.

For your use case you just need to apply this technique to both sources and the zip them together.

The code:

import static org.junit.Assert.assertEquals;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Optional;

import org.junit.Test;

import rx.Observable;

public class StateMachineExampleTest {

    @Test
    public void testForStackOverflow() {
        Observable<Integer> a = Observable.just(1, 1, 2, 2, 2, 3);
        State<Integer> initial = new State<Integer>(Collections.emptyList(), Optional.empty(),
                false);
        List<List<Integer>> lists = a.materialize()
                // accumulate lists and uses onCompleted notification to emit
                // left overs when source completes
                .scan(initial,
                        (state, notification) -> {
                            if (notification.isOnCompleted()) {
                                return new State<>(null, state.value, true);
                            } else if (notification.isOnError())
                                throw new RuntimeException(notification.getThrowable());
                            else if (state.list.size() == 0) {
                                return new State<>(Arrays.asList(notification.getValue()), Optional
                                        .empty(), false);
                            } else if (partitionId(notification.getValue()) == partitionId(state.list
                                    .get(0))) {
                                List<Integer> list = new ArrayList<>();
                                list.addAll(state.list);
                                list.add(notification.getValue());
                                return new State<>(list, Optional.empty(), false);
                            } else if (state.value.isPresent()) {
                                if (partitionId(state.value.get()) == partitionId(notification
                                        .getValue())) {
                                    return new State<>(Arrays.asList(state.value.get(),
                                            notification.getValue()), Optional.empty(), false);
                                } else {
                                    return new State<>(Arrays.asList(state.value.get()), Optional
                                            .of(notification.getValue()), false);
                                }
                            } else {
                                return new State<>(state.list,
                                        Optional.of(notification.getValue()), false);
                            }
                        })
                // emit lists from state
                .flatMap(state -> {
                    if (state.completed) {
                        if (state.value.isPresent())
                            return Observable.just(Arrays.asList(state.value.get()));
                        else
                            return Observable.empty();
                    } else if (state.value.isPresent()) {
                        return Observable.just(state.list);
                    } else {
                        return Observable.empty();
                    }
                })
                // get as a list of lists to check
                .toList().toBlocking().single();
        assertEquals(Arrays.asList(Arrays.asList(1, 1), Arrays.asList(2, 2, 2), Arrays.asList(3)),
                lists);
    }

    private static int partitionId(Integer n) {
        return n;
    }

    private static final class State<T> {
        final List<T> list;
        final Optional<T> value;
        final boolean completed;

        State(List<T> list, Optional<T> value, boolean completed) {
            this.list = list;
            this.value = value;
            this.completed = completed;
        }
    }

}

Bear in mind that this code was knocked up quickly and might have holes. Be sure to do full unit testing with your adaption of this code.

An extra note for you is that because we use backpressure supporting Operators materialize, scan and flatMap the resultant transformation also supports backpressure and thus can be safely combined with zip.