0
votes

I'm playing around with implementing my own observables or porting them from other languages for fun and profit.

The problem I've run into is that there's very little info on how to properly test observables or async code in general.

Consider the following test code:

// Create a stream of values emitted every 100 milliseconds
// `interval` uses Timer internally
final Stream<Number> stream = 
  Streams.interval(100).map(number -> number.intValue() * 10);

ArrayList<Number> expected = new ArrayList<>();

expected.add(0);
expected.add(10);
expected.add(20);

IObserver<Number> observer = new IObserver<Number>() {
  public void next(Number x) {
    assertEquals(x, expected.get(0));
    expected.remove(0);
    if(expected.size() == 0) {
      stream.unsubscribe(this);
    }
  }
  public void error(Exception e) {}
  public void complete() {}
};

stream.subscribe(observer);

As soon as the stream is subscribed to, it emits the first value. onNext is called... And then the test exits successfully.

In JavaScript most test frameworks nowadays provide an optional Promise to the test case that you can call asynchronously on success/failure. Is anything similar available for Java?

1

1 Answers

1
votes

Since the execution is asyncronious, you have to wait until is finish. You can just wait for some time in an old fashion way

 your_code
 wait(1000)
 check results.

Or if you use Observables you can use TestSubscriber In this example you can see how having an async operation we wait until the observer consume all items.

@Test
public void testObservableAsync() throws InterruptedException {
    Subscription subscription = Observable.from(numbers)
            .doOnNext(increaseTotalItemsEmitted())
            .subscribeOn(Schedulers.newThread())
            .subscribe(number -> System.out.println("Items emitted:" + total));
    System.out.println("I finish before the observable finish.  Items emitted:" + total);


    new TestSubscriber((Observer) subscription)
            .awaitTerminalEvent(100, TimeUnit.MILLISECONDS);
}

You can see more Asynchronous examples here https://github.com/politrons/reactive/blob/master/src/test/java/rx/observables/scheduler/ObservableAsynchronous.java