1
votes

Let's say we have 3 observables, A, B, and C. I need to run all 3 at the same time (asynchronously, for the layman), but:

  1. If I get anything from A, emit it... do not emit anything else.
  2. If A completes without emitting anything, apply rule 1 to B.
  3. If B completes without emitting anything, emit items from C.
  4. If C completes without emitting anything, emit a default item.

I spent hours trying to figure this out yesterday, and there doesn't seem to be any operational combination already in RxJava that will let me do this.

You can think of the values cascading from left to right:

A --> B --> C

and also, the cascade is blocked, while each runs async and caches their values.

A (nothing) --> B (nothing) --> C (nothing) --> default item

To be clear, A must complete before anything is emitted from any other observer. The same logic for B, then for C, then comes the default if A, B, C fail to emit anything.

Obviously there is caching involved, and I absolutely do NOT want to replay the observable over. I'm going to need to replay the cached values. That are held up at each gate.

The behavior is extremely similar to concat() except the next part of the chain is not unleashed if there were emissions before it.

2

2 Answers

0
votes

Here's what I came up with:

**
 * Works like {@link rx.Observable#concat} but concatenated Observables
 * are all run immediately on their given {@link rx.Scheduler}.
 *
 * This Observable is blocking in the sense that items are emitted in order
 * like {@link rx.Observable#concat} but since each Observable is run on
 * an (possibly) asynchronous scheduler, items emitted further down the chain
 * of Observables are held until items further up the chain are (possibly) emitted.
 *
 * This Observable also short-circuits and does not emit items further down
 * the chain of Observables when an Observable higher up the chain emits items.
 *
 * For example:
 *
 * Given Observable A, B, and C
 *
 * If A emits item(s) emit them... do not emit anything else.
 * If A completes without emitting anything, apply previous rule to B.
 * If B completes without emitting anything, emit items from C (if any)
 *
 * @param <T>
 */
public class ConcatObservable<T> {
  private final List<Observable<? extends T>> observables;

  private ConcatObservable(List<Observable<? extends T>> observables) {
    this.observables = observables;
  }

  public static <T> ConcatObservable<T> from(Observable<? extends T>... observables) {
    return new ConcatObservable<T>(Arrays.asList(observables));
  }

  public Observable<T> asObservable() {
    final List<Subscription> subscriptions = new CopyOnWriteArrayList<Subscription>();

    return Observable.create(new Observable.OnSubscribe<T>() {
      @Override public void call(final Subscriber<? super T> subscriber) {
        List<Observable<? extends T>> cachedObservables = new ArrayList<Observable<? extends T>>();
        for (Observable<? extends T> observable : observables) {

          // tell it to cache values
          final ReplaySubject<T> subject = ReplaySubject.create();
          cachedObservables.add(subject);

          // run it with nobody listening
          Subscription subscription = observable.subscribe(new Observer<T>() {
            @Override public void onCompleted() {
              subject.onCompleted();
            }

            @Override public void onError(Throwable e) {
              subject.onError(e);
            }

            @Override public void onNext(T item) {
              subject.onNext(item);
            }
          });
          subscriptions.add(subscription);
        }

        final AtomicReference<Throwable> error = new AtomicReference<Throwable>();

        // for the cached ones, already running
        for (Observable<? extends T> observable : cachedObservables) {

          final AtomicBoolean shouldExit = new AtomicBoolean(false);
          final CountDownLatch latch = new CountDownLatch(1);
          Subscription subscription = observable.subscribe(new Observer<T>() {
            @Override public void onCompleted() {
              latch.countDown();
            }

            @Override public void onError(Throwable e) {
              error.set(e);
              shouldExit.set(true);
              latch.countDown();
            }

            @Override public void onNext(T item) {
              subscriber.onNext(item);
              shouldExit.set(true);
            }
          });

          // Track each subscription
          subscriptions.add(subscription);

          try {
            // Wait for this one to stop emitting, or error
            latch.await();
          } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            throw new RuntimeException("Interrupted while waiting for subscription to complete", e);
          }

          // This one had an item(s), so we don't bother with the rest
          if (shouldExit.get()) {
            break;
          }
        }

        // Release inner subscriptions
        for (Subscription subscription : subscriptions) {
          subscription.unsubscribe();
        }

        // Obey the Observable contract...
        Throwable throwable = error.get();
        if (throwable != null) {
          subscriber.onError(throwable);
        } else {
          subscriber.onCompleted();
        }
      }
    });
  }
}

And here are the corresponding tests:

public class ConcatObservableTest {

  @Test @SuppressWarnings("unchecked")
  public void it_onlyEmitsFromFirstObservable() {
    Observable<String> A = Observable.from(Arrays.asList("A", "A", "A"));
    Observable<String> B = Observable.from(Arrays.asList("B", "B", "B"));
    Observable<String> C = Observable.from(Arrays.asList("C", "C", "C"));

    Observable<String> observable = ConcatObservable.from(A, B, C).asObservable();

    TestSubscriber<String> testSubscriber = new TestSubscriber<String>();
    observable.subscribe(testSubscriber);

    assertThat(testSubscriber.getOnNextEvents()).containsExactly("A", "A", "A");
  }

  @Test @SuppressWarnings("unchecked")
  public void it_onlyEmitsFromSecondObservable() {
    Observable<String> A = Observable.empty();
    Observable<String> B = Observable.from(Arrays.asList("B", "B", "B"));
    Observable<String> C = Observable.from(Arrays.asList("C", "C", "C"));

    Observable<String> observable = ConcatObservable.from(A, B, C).asObservable();

    TestSubscriber<String> testSubscriber = new TestSubscriber<String>();
    observable.subscribe(testSubscriber);

    assertThat(testSubscriber.getOnNextEvents()).containsExactly("B", "B", "B");
  }

  @Test @SuppressWarnings("unchecked")
  public void it_onlyEmitsFromLastObservable() {
    Observable<String> A = Observable.empty();
    Observable<String> B = Observable.empty();
    Observable<String> C = Observable.from(Arrays.asList("C", "C", "C"));

    Observable<String> observable = ConcatObservable.from(A, B, C).asObservable();

    TestSubscriber<String> testSubscriber = new TestSubscriber<String>();
    observable.subscribe(testSubscriber);

    assertThat(testSubscriber.getOnNextEvents()).containsExactly("C", "C", "C");
  }

  @Test @SuppressWarnings("unchecked")
  public void it_shouldStartAllObservables() {
    TestObservable<String> letters = TestObservable.createTestObservable("A", "B", "C");
    TestObservable<String> numbers = TestObservable.createDelayedTestObservable(100, "1", "2", "3");
    TestObservable<String> animals = TestObservable.createDelayedTestObservable(200, "zebra", "donkey", "unicorn");

    Observable<String> observable = ConcatObservable.from(letters, numbers, animals).asObservable();

    TestSubscriber<String> testSubscriber = new TestSubscriber<String>();
    observable.subscribe(testSubscriber);

    assertThat(letters.isCalled()).isTrue();
    assertThat(numbers.isCalled()).isTrue();
    assertThat(animals.isCalled()).isTrue();
  }

  static class TestObservable<T> extends Observable<T> {
    private final TestOnSubscribe<T> onSubscribeFunc;

    private TestObservable(TestOnSubscribe<T> f) {
      super(f);
      onSubscribeFunc = f;
    }

    public boolean isCalled() {
      return onSubscribeFunc.isCalled();
    }

    @SuppressWarnings("unchecked")
    public static <T> TestObservable<T> createTestObservable(final T... items) {
      return createDelayedTestObservable(0, items);
    }

    @SuppressWarnings("unchecked")
    public static <T> TestObservable<T> createDelayedTestObservable(final long delay, final T... items) {
      return new TestObservable<T>(new TestOnSubscribe<T>(delay, items));
    }

    private static class TestOnSubscribe<T> implements OnSubscribe<T> {
      private final long delay;
      private final T[] items;
      private boolean isCalled;

      private TestOnSubscribe(long delay, T... items) {
        this.delay = delay;
        this.items = items;
      }

      @Override public void call(Subscriber<? super T> subscriber) {
        isCalled = true;

        for (T item : items) {
          if (delay > 0) {
            sleep(delay);
          }
          subscriber.onNext(item);
        }
        subscriber.onCompleted();
      }

      public boolean isCalled() {
        return isCalled;
      }

      private void sleep(long time) {
        try {
          Thread.sleep(time);
        } catch (InterruptedException e) { }
      }
    }
  }
}
0
votes
public class ConcatObservable<T> {

private final List<Observable<? extends T>> observables;

private ConcatObservable(List<Observable<? extends T>> observables) {
    this.observables = observables;
}

public static <T> ConcatObservable<T> from(Observable<? extends T>... observables) {
    return new ConcatObservable<T>(Arrays.asList(observables));
}

public Observable<T> asObservable() {
    return Observable.create(new Observable.OnSubscribe<T>() {
        @Override
        public void call(final Subscriber<? super T> subscriber) {
            List<Observable<? extends T>> cachedObservables = new ArrayList<Observable<? extends T>>();
            for (Observable<? extends T> observable : observables) {
                ConnectableObservable<? extends T> replayedObservable = observable.replay();
                cachedObservables.add(replayedObservable);
                subscriber.add(replayedObservable.connect());
            }
            Subscription s = Observable.concat(Observable.from(cachedObservables)).take(1).subscribe(subscriber);
            subscriber.add(s);
        }
    });
}
}

Edit

This is close, but it fails the following test:

@Test @SuppressWarnings("unchecked")
public void it_onlyEmitsFromFirstObservable() {
  Observable<String> A = Observable.from(Arrays.asList("A", "A", "A"));
  Observable<String> B = Observable.from(Arrays.asList("B", "B", "B"));
  Observable<String> C = Observable.from(Arrays.asList("C", "C", "C"));

  Observable<String> observable = ConcatObservable.from(A, B, C).asObservable();

  TestSubscriber<String> testSubscriber = new TestSubscriber<String>();
  observable.subscribe(testSubscriber);

  assertThat(testSubscriber.getOnNextEvents()).containsExactly("A", "A", "A");
}