2
votes

If I have an observable that takes a long time to execute. We will call it longObservable which is of type Observable, takes 5 seconds to execute, and each time it executes it emits a new string only once then completes.

longObservable(): Subject<string> {
    return timer(5000).pipe{
        map(() => randomString())
    }
}

And some other pages call it multiple times. If it's in process, I'd like to just continue that one process. If it's complete, I'd like to start it again.

longObservable.subscribe() // Immediate, start the timer

and this runs two seconds later:

longObservable.subscribe() // Get the same string as
                            //  the above subscription in 3 seconds.

and this runs 20 seconds later

longObservable.subscribe() // Start a new iteration and wait
                            // 5 seconds, get a new string.

The second subscribe I think is easy, it'll work how I want. It's the 3rd one that I'm having trouble with. It will emit the same value as the other two immediately since longObservable is complete.

This is being used for geolocation on a device. I want to request a new location, but if there is already a request in progress, just use that result.

Edit: Changed observable to subject for multicasting, removed take(1).

Edit2: https://stackblitz.com/edit/angular-venpk4 here is a working example of what I want. I'm hoping to accomplish this without the timerRunning variable and with RxJS operators. It's under the hello component and prints to the console.

3
Your question seems confusingAnoop Mc
note: take(1) in your example is not necessaryJota.Toledo

3 Answers

2
votes

Tricky problem. Here is my solution in a StackBlitz. A couple of keys to this are the share() operator, which effectively turns the observable into a subject, without having to declare the subject explicitly. However, you need a NEW subject to be created with a new subscription once the old one completes, so I created a factory function to either return the existing sharable Observable (if longObservable() is still in progress) or else generate a new one.

Here are the important bits from the StackBlitz:

let inProgress: boolean = false;

function longObservable(): Observable<string> {
    return timer(5000).pipe(
        map(() => randomString()),
        tap(() => inProgress = false),
        share()
    )
}

let obs$: Observable<string>;

function getLongObs(): Observable<string> {
    if (inProgress) {
        return obs$
    } else {
        inProgress = true;
        obs$ = longObservable();
        return obs$;
    }
}

console.log('initiate first subscribe');
getLongObs().subscribe(
    rand => console.log(`First subscribe returned ${rand}`)
);

setTimeout(() => {
    console.log('initiate second subscribe');
    getLongObs().subscribe(
        rand => console.log(`Second subscribe returned ${rand}`)
    );
}, 2000);

setTimeout(() => {
    console.log('initiate third subscribe');
    getLongObs().subscribe(
        rand => console.log(`Third subscribe returned ${rand}`)
    );
}, 7000)

I hope this helps!

1
votes

As you show in your example, your method returns new Observable instance, which creates every time you subscribe to it. I think in your service you can create a property, which will store your observable. May be better to make this property as BehaviorSubject. And wherever you want you can subscribe to this property. So every subscriprion will be to the same Observable instance.

1
votes

I think what you want is the share() pipe. Something like this works:

    export class AppComponent {
        private _longObservable: Observable<string> = null

        constructor() {
            this._longObservable = timer(5000).pipe(
                // This will show us when timer emits a value which will prove that the
                // first two subscriptions below are sharing the same "execution"
                // of the observable.
                tap(() => console.log("Timer Triggered!")), 
                map(() => randomString()),
                share()
            );
        }

        ngOnInit() {
            // These two will share the observable,
            // since long observable hasn't completed by the time the second
            // subscription is triggered.
            this._longObservable.subscribe(console.log);
            setTimeout(() => this._longObservable.subscribe(console.log), 2000);

            // This subscription occurs after the 5 sec.
            // Since timer is a cold observable, this will trigger it to run again.
            setTimeout(() => this._longObservable.subscribe(console.log), 7000);
        }


    }

Output:

Timer Triggered!
randomString1
randomString1
Timer Triggered!
randomString2

Here is an article on the difference between hot and cold observables if you aren't familiar with that distinction: https://medium.com/@benlesh/hot-vs-cold-observables-f8094ed53339

Http requests in angular, and timer(5000) are both cold observables.

Here is a link to some info about the share pipe: https://www.learnrxjs.io/operators/multicasting/share.html