Okay, here is how I would go about writing this.
Firstly, whatever class has the differentThan function should be changed to override equals instead. Otherwise you can't use a lot of basic methods with these objects.
For the purpose of this example I wrote all the observables using the Integer class as my type parameter. I then use a scheduler to write two mock methods:
static Observable<Integer> ratesFromNetwork(Scheduler scheduler) {
return Observable.<Integer>create(sub -> {
sub.onNext(2);
sub.onCompleted();
}).delay(99, TimeUnit.MILLISECONDS, scheduler);
}
static Observable<Integer> latestRatesFromDB(Scheduler scheduler) {
return Observable.<Integer>create(sub -> {
sub.onNext(1);
sub.onCompleted();
}).delay(99, TimeUnit.MILLISECONDS, scheduler);
}
As you can see both are similar, however, they will emit different values.
lack of the first one is considered an error
The best way to achieve this is to use a timeout. You can log the error immediately here and continue:
final Observable<Integer> networkRate = ratesFromNetwork(scheduler)
.timeout(networkTimeOut, TimeUnit.MILLISECONDS, scheduler)
.doOnError(e -> System.err.println("Failed to get rates from network."));
When the timeout fails an error will be thrown by rx. doOnError will give you a better idea of where this error started and let it propagate through the rest of the sequence.
The second one might be empty
In this case I would do a similar strategy, however, do not let the error propagate by using the method onErrorResumeNext. Now you can make sure the observable emits at least one value by using firstOrDefault. In this method use some dummy value that you expect to never match with the network results.
final Observable<Integer> databaseRate = latestRatesFromDB(scheduler)
.timeout(databaseTimeOut, TimeUnit.MILLISECONDS, scheduler)
.doOnError(e -> System.err.println("Failed to get rates from database"))
.onErrorResumeNext(Observable.empty())
.firstOrDefault(-1);
Now by using the distinct method you can grab a value only when it is different than the one that came before it (which is why you need to override equals).
databaseRate.concatWith(networkRate).distinct().skip(1)
.subscribe(i -> System.out.println("Updating to " + i),
System.err::println,
() -> System.out.println("completed"));
Here the database rate was placed before the network rate to take advantage of distinct. a skip is then added to always ignore the database rate value.
Complete Code:
final long networkTimeOut = 100;
final long databaseTimeOut = 100;
final TestScheduler scheduler = new TestScheduler();
final Observable<Integer> networkRate = ratesFromNetwork(scheduler)
.timeout(networkTimeOut, TimeUnit.MILLISECONDS, scheduler)
.doOnError(e -> System.err.println("Failed to get rates from network."));
final Observable<Integer> databaseRate = latestRatesFromDB(scheduler)
.timeout(databaseTimeOut, TimeUnit.MILLISECONDS, scheduler)
.doOnError(e -> System.err.println("Failed to get rates from database"))
.onErrorResumeNext(Observable.empty())
.firstOrDefault(-1);
databaseRate.concatWith(networkRate).distinct().skip(1)
.subscribe(i -> System.out.println("Updating to " + i),
System.err::println,
() -> System.out.println("completed"));
scheduler.advanceTimeBy(200, TimeUnit.MILLISECONDS);
When networkTimeOut and databaseTimeOut are greater than 100 it prints:
Updating to 2
completed
When networkTimeOut is less than 100 it prints:
Failed to get rates from network.
java.util.concurrent.TimeoutException
When databaseTimeOut is less than 100 it prints:
Failed to get rates from database
Updating to 2
completed
And if you modify latestRatesFromDB and ratesFromNetwork to return the same value, it simply prints:
completed
And if you don't care about forcing timeouts or logging then it boils down to:
latestRatesFromDB().firstOrDefault(dummyValue)
.concatWith(ratesFromNetwork())
.distinct().skip(1)
.subscribe(this::save,
System.err::println,
() -> System.out.println("completed"));