15
votes

I am trying to use switchMap to cancel any previous http calls in Angular2. The code is basically

var run = ():Observable<any> => {
        var url = 'http://...'
        return this._http.get(url)
            .map(result => {
                return var xmlData:string = result.text()

            });
    }


    function pollTasks() {
        return Observable.of(1)
            .switchMap(() => run())
            .map(res => res)
    }

    // caller can do subscription and store it as a handle:
    let tasksSubscription =
        pollTasks()
            .subscribe(data => {
                console.log('aa'+data)
            });

and so I call the entire source several times in a row and receive several replies (i.e.: aa+data)

I was under the impression switchMap should cancel the previous calls.

4
How do you call it several times?Günter Zöchbauer
well just for testing I simply run is as in: runCmd(); runCmd(); runCmd(); ...born2net
from the answer below I now understand that that it can't UNLESS it comes from the same originating stream... disappointing... but logical, tx!born2net
You could merge the input to a single stream, then it should work.Günter Zöchbauer
any possible code snippet on the merge would be awesome ...born2net

4 Answers

21
votes

The requests need to originate from the same underlying stream. Here's a factory function that will create an http service instance that should do what you want:

function httpService(url) {
   // httpRequest$ stream that allows us to push new requests
   const httpRequest$ = new Rx.Subject();

   // httpResponse$ stream that allows clients of the httpService
   // to handle our responses (fetch here can be replaced with whatever
   // http library you're using).
   const httpResponse$ = httpRequest$
       .switchMap(() => fetch(url));


   // Expose a single method get() that pushes a new
   // request onto the httpRequest stream. Expose the
   // httpResponse$ stream to handle responses.
   return {
       get: () => httpRequest$.next(),
       httpResponse$
   };
}

And now the client code can use this service like this:

const http = httpService('http://my.api.com/resource');

// Subscribe to any responses
http.httpResponse$.subscribe(resp => console.log(resp));

// Call http.get() a bunch of times: should only get one log!!
http.get();
http.get();
http.get();
7
votes
constructor(private _http:Http, private appStore:AppStore) {

        this.httpRequest$ = new Subject();


        this.httpRequest$
            .map(v=> {
                return v;
        })
        .switchMap((v:any):any => {
            console.log(v);
            if (v.id==-1||v.id=='-1')
                return 'bye, cancel all pending network calls';                
            return this._http.get('example.com)
                .map(result => {
                    var xmlData:string = result.text()
                });
        }).share()
        .subscribe(e => {                
        })
    ...

and to push data in:

this.httpRequest$.next({id: busId});        

this works great and I can now have a single service that I can pipe all network calls through as well as cancel prior calls...

see image below, as new calls come in, prior ones are canceled. note how I set slow network with a delay of 4sec to provide all is working as expected...

enter image description here

3
votes

I think when you use the switchMap operator, you can only cancel requests in the current data flow. I mean the events that occur on the same observable chain...

If you call your pollTasks method several times, you won't be able to the cancel previous requests because they won't be in the same data flow... You create an observable chain each time you call the method.

I don't know how you trigger the execution of your requests.

If you want to execute your request each 500ms, you could try this:

pollTasks() {
  return Observable.interval(500)
                .switchMap(() => run())
                .map(res => res.json());
}

In this case, if there are in-progress request after 500ms, they will be canceled to execute the new one

With the approach, you just need to call once the pollTasks method.

You can also trigger the asynchronous processing chain based on user events. For example, when characters are filled in inputs:

var control = new Control();
// The control is attached to an input using the ngFormControl directive
control.valueChanges.switchMap(() => run())
                .map(res => res.json())
                .subscribe(...);

There is a proposal to link / initiate more easily processing chain on DOM events (fromEvent)

See this link:

0
votes

You can use Subject but if you do so, you have to manage subscription and publish. If you just want a method returning Observable which cancel requests within a interval, here is how I would do that :

observer: Observer<CustomObject>;
debug = 0;

myFunction(someValue) {
    this.observable = new Observable<CustomObject>(observer => {
        if (!this.observer) {
            this.observer = observer;
        }

       // we simulate http response time
        setTimeout(() => {
            this.observer.next(someValue);
            this.debug++;
        }, 1000);
    })
    .debounceTime(3000) // We cancel request withing 3s interval and take only the last one
    .switchMap(fn)
    .do(() => {
        console.log("debug", this.debug);
    });
}

Using the same observer all along let us cancel requests according all we want (debounceTime, distinctUntilChanged, ...).