0
votes

I have custom set of objects that's I want to emit after every 2 seconds.That means when I add set Of Items to ReplaySubject<> it should get access to those object one by one onNext() method every 2 seconds. This objects are added to ReplaySubject<> dynamically on user events. That's why I'm using ReplaySubject<>.

I have achieve that but I want to pause and resume this thread base on some condition which is going to be change base on a user interaction. How to achieve this.

Here's my code.

private ReplaySubject<CartItemToBeRemove> cartItemToBeRemoveSubject;

    @Override
    protected void onCreate(Bundle savedInstanceState) {
        super.onCreate(savedInstanceState);
        setContentView(R.layout.activity_cart);
        ButterKnife.bind(this);

        cartItemToBeRemoveSubject = ReplaySubject.create();

        initDeleteQueue();
    }

public void initDeleteQueue() {

        cartItemToBeRemoveSubject
                .delay(2,TimeUnit.SECONDS)
                .subscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread())
                .subscribeWith(new Observer<CartItemToBeRemove>() {
                    @Override
                    public void onSubscribe(Disposable d) {

                    }

                    @Override
                    public void onNext(CartItemToBeRemove itemToBeRemove) {
                        cartData.remove(itemToBeRemove.getItem());
                        cartListAdapter.notifyItemRemoved(itemToBeRemove.getPosition());
                        Log.d(TAG, "item" + deletedItem.getName());
                    }

                    @Override
                    public void onError(Throwable e) {

                    }

                    @Override
                    public void onComplete() {

                    }
                });
    }

I'm adding objects to ReplaySubject like below

cartItemToBeRemoveSubject.onNext(toBeRemove);
1
I don't have a definite answer for you, but you might look at zipping your subject with an Observable.interval. - cwbowron

1 Answers

0
votes

You want to add pacing to an observable stream. Normal pacing is emitting an item once every two seconds, but that may be paused and resumed.

The basic pacing mechanism is something like Observable.interval( 2, SECONDS ), which emits a Long every 2 seconds. However, you want to be able pause it occasionally. Let's say we have an Observable<Boolean> that emits TRUE when the pacing mechanism is to proceed and FALSE when it should pause.

Observable<Boolean> pacingControl;

pacingControl
  .switchMap( ctl -> ctl ? Observable.interval( 0, 2, SECONDS ) : Observable.never() )
  .zipWith( cartItemToBeRemoveSubject, (t, item) -> item )
  .subscribeOn(Schedulers.io())
  .observeOn(AndroidSchedulers.mainThread())
  .subscribe( ... );

pacingControl can be implemented as a subject and the user action will be either pacingControl.onNext( TRUE ) to start the process or pacingControl.onNext( FALSE ) to pause it. The switchMap() operator switches between the timer and never(), which obviously never produces anything.