2
votes

In Java's PropertyChangeSupport observable maintains a list of observers. So, if the observable is deleted, the observer is subject to garbage collecting.

So, an observable holds the link.

I have the impression, that this is not true for ReactiveX. Although observable has a subscribe method, it returns a Subscription object. Does this object hold the link?

If I add this object to observer with add() will this be the only way, to make observer hold the link? Or does observer hold the link anyway?

1

1 Answers

0
votes

I think still the Observable maintains the connection to the Observer. Check out the code below.

private Observable<CurrencyRatesDTO> getCurrencyRatesObservable(Set<String> currencies) {
        return Observable.<CurrencyRatesDTO> create(sub -> {
            CurrencyRatesDTO currencyRatesDTO = restTemplate.getForEntity(
                    CURRENCY_SERVICE_API + RestUtil.QUERY_PARAM_START_SYMBOL
                            + RestUtil.getQueryParamStringForMultiValuedAttribute(SYMBOLS, currencies),
                    CurrencyRatesDTO.class).getBody();
            sub.onNext(currencyRatesDTO);
            sub.onCompleted();
        }).doOnNext(c -> log.debug("Currency rates were retrieved successfully."))
                .doOnError(e -> log.error("An ERROR occurred while retrieving the currency rates.", e));
}

We call some rest service asynchronously. Once we get the result we notify the Observer using onNext method and emit the item before signal the completion to the Observer. I have written some blog posts [1][2][3] and you may find them useful in RxJava landscape. Following is a little excerpt from my blog post.According to that it is pretty much clear that Observable holds the reference to Observer.

RxJava provides Observables and Observers. Observables can send out values. Observers, watch Observables by subscribing to them. Observers are notified when an Observable emits a value, when the Observable says an error has occurred. They are also notified when the Observable sends the information that it no longer has any values to emit. The corresponding functions are onNext, onError, and onCompleted() from the Observer interface. An instance of Subscription represents the connection between an observer and an observable. If you call unsubscribe() on this instance then it will remove the connection.

Hope this helps. Happy Coding !

[1]http://ravindraranwala.blogspot.com/2016/12/introducing-java-reactive-extentions-in.html [2]http://ravindraranwala.blogspot.com/2017/01/calling-exterenal-nosql-database.html [3]http://ravindraranwala.blogspot.com/2017/01/combine-emissions-of-multiple_16.html