Make a custom OnSubscribe implementation that does what you want:
public static class OnSubscribeRefreshingCache<T> implements OnSubscribe<T> {
private final AtomicBoolean refresh = new AtomicBoolean(true);
private final Observable<T> source;
private volatile Observable<T> current;
public OnSubscribeRefreshingCache(Observable<T> source) {
this.source = source;
this.current = source;
}
public void reset() {
refresh.set(true);
}
@Override
public void call(Subscriber<? super T> subscriber) {
if (refresh.compareAndSet(true, false)) {
current = source.cache();
}
current.unsafeSubscribe(subscriber);
}
}
This bit of code demonstrates usage and shows that cache is essentially being reset:
Observable<Integer> o = Observable.just(1)
.doOnCompleted(() -> System.out.println("completed"));
OnSubscribeRefreshingCache<Integer> cacher =
new OnSubscribeRefreshingCache<Integer>(o);
Observable<Integer> o2 = Observable.create(cacher);
o2.subscribe(System.out::println);
o2.subscribe(System.out::println);
cacher.reset();
o2.subscribe(System.out::println);
Output:
completed
1
1
completed
1
By the way you may notice that .cache doesn't emit till completion. This is a bug that should be fixed by rxjava 1.0.14.
In terms of your GC pressure concerns, every operator when applied to an Observable creates a new Observable usually via lift or create. The base member state associated with creating a new Observable is the reference to the onSubscribe function. cache is different from most in that it holds state across subscriptions and this holds potential for GC pressure if it holds a lot of state and is thrown away frequently. Even if you used the same mutable data structure to hold the state across resets GC would still have to deal with the contents of the data structure when cleared so you might not gain much.
The RxJava cache operator is built for multiple concurrent subscriptions. You can probably imagine a reset functionality could prove problematic to implement. By all means raise an issue on RxJava github if you want to explore further.