2
votes

I'm trying to wrap an event-based API with RXJS Observables, but I can't figure out how to clean up after myself after an unsubscription happens.

This is pretty much how I would sync up the subscription with the Observable and the event-based API:

interface SomeEventService{
    registerListener(messageName: string, messageContent: Something): Listener;
    unregisterListener(listener: Listener);
}

class MyWrapper {
    private eventService: SomeEventService;

    streamOfSomething$(): Observable<Something> {
        return Observable.create((observer: Observer<Something>) => {
            // Create a listener, and relay events to the Observable
            const listener = this.eventService.registerListener("something", something => {
                observer.next(something);
            })
            // Now what?
        })
    }
}

The problem here is that at some time in the future I want to call eventService.unregisterListener, e.g when the consumer of streamOfSomething$ unsubscribes from the stream.

The question: How do I do this? As far as I can tell there is no event or callback I can use on my observer to run code after an unsubscribe happens.

1

1 Answers

2
votes
class MyWrapper {
    private eventService: SomeEventService;

    streamOfSomething$(): Observable<Something> {
        return Observable.create((observer: Observer<Something>) => {
            // Create a listener, and relay events to the Observable
            const listener = this.eventService.registerListener("something", something => {
                observer.next(something);
            })
            // Now what?
             return () => { this.eventService.unRegisterListener(); };
        })
    }
}

the function which you will return will be called when user call the unsubscribe()