I have a stream of request objects:
let request$ = new Subject();
And I have a function that takes a request object and the current app state object, and returns a stream of new state objects:
function stream(request, state) {
// Return a cold Observable of new state objects
}
When the first request is emitted, I want to call stream and pass in that request and an initial state object, and start processing the states that it returns.
Some time after that, a second request is emitted. The first stream needs to stop and be replaced with a new stream that is created by calling stream again, passing in the second request and the latest state that was emitted from the first stream.
I feel like maybe scan with a higher-order stream can do it but I am stuck here:
request$.pipe(
scan((state$, request) => {
let previousState = ???
return stream(request, previousState);
}, of(initialState)),
switchAll()
)
EDIT - Example
In real life, the stream function establishes a websockets connection to a server, sends parts of the request and state arguments in the request, and then emits new state objects as messages are received from the server.
To simplify for an example, let's say request and state are just numbers, and the stream function just repeatedly adds request to state, emitting one result per second:
function stream(request, state) {
return interval(1000).pipe(
map(i => state + (i+1) * request)
);
}
// e.g. stream(2, 10) will emit 12, 14, 16, 18, ...
The desired output would be:
initialState = 10
request$ 2------------5-----------------
stream(2, 10) --12--14--16-|
stream(5, 16) --21--26--31--36--
...
output --12--14--16---21--26--31--36--
Notice how at the time the 5 request is emitted, both the 5 and the latest state 16 are required to build the next stream.