1
votes

I'm using RxJava and the concat() and first() operators:

public Observable<List<Entity>> getEntities() {
    invalidateCacheIfNeeded();
    return Observable
            .concat(cachedEntities(), networkEntities())
            .first();
}

The cachedEntities returns an Observable built from a cached list while the networkEntities method fetches the entities with Retrofit.

This works great unless two user subscribes quickly to the observables returned by getEntities(). I guess the network request of the first subscribe is not finished when the second subscribe is made. In this case, two network requests are performed. Which I want to avoid.

I tried to create a single thread Scheduler so the the execution of the second call is only carried out when the first call is over but with no luck:

 mSingleThreadScheduler = Schedulers.from(Executors.newSingleThreadExecutor());

and:

public Observable<List<Entity>> getEntities() {
    invalidateCacheIfNeeded();
    return Observable
            .concat(cachedEntities(), networkEntities())
            .subscribeOn(mSingleThreadScheduler)
            .first();
}

I've tried to sprinkle the subscribeOn call lower in the Observable chain but I get the same result.

Any hint?

3
I believe you could simply use the synchronized keyword on your getEntities() declaration for your method and it should wait for a previous call to complete before allowing for another with a thread lock. - Jay Snayder
The concat() call is not blocking so getEntitiesreturns almost immediatly. synchronized on the method won't work in this case. - fstephany
But even if concat() is not blocking, whatever is calling getEntities() would be blocking while waiting for a return from the call wouldn't it? So you would think synchronized would be ok for this call if you were trying to keep it from being called multiple times. Unless I am missing something. How can it return immediately if it isn't finished its task? Maybe I can pick up a tidbit here. - Jay Snayder
The getEntities() only build the Observabl (which is super fast). The real work is done when someone subscribes to this observable. But you're right my wording is probably misleading. I'll update the question to reflect the subscribing phase. - fstephany

3 Answers

0
votes

Given

public Observable<List<Entity>> getEntities() {
    invalidateCacheIfNeeded();
    return Observable
            .concat(cachedEntities(), networkEntities())
            .first();
}

You should create an AsyncSubject<Data> mSubject and use it as follow

private Observable<List<Entity>> networkEntities() {
    return mSubject
            .map(Data::getEntities);
}

And your network call should look like this

public Observable<Data> getDataFromNetwork() {
    return networkOperation()
            .subscribeOn(mSingleThreadScheduler)
            .subscribe(mSubject);
}
1
votes

I think it is not a good idea to make a method thread-safe. Because it blocks the whole method thus decrease the performance. So it is recommended to make the data structure thread-safe. In your case your are using List in your method

public Observable<List<Entity>> getEntities() {

}

Use CopyOnWriteArrayList instead of List. It is thread safe.

public Observable<CopyOnWriteArrayList<Entity>> getEntities() {

}

Hope it will work.

0
votes

This seems to be the relatively common use case of one observable with multiple subscribers. You need something like

public Observable<List<Entity>> getEntities() {
    invalidateCacheIfNeeded();
    return Observable
            .concat(cachedEntities(), networkEntities())
            .first()
            .replay(1)            
}

See the answers to this question for a more in depth explanation.