I have a Subject with publishReplay and refcount, so it effectively never completes. The output of the refcount is an Observable array of objects. I would like to decompose the Observable object array into emitted elements, transform as needed, and then merge them back into an array, but since the source is non-completing, the array boundary is 'lost' in the asynchronous nature.
As an example, I use mergeMap and scan to accumulate the data. However, as expected, it continually accumulates, so the resulting array is the entire history of the stream. The initial value of the scan is only initialized at the beginning of the stream. I can't use toArray, because the Observable never terminates.
I have a lot of experience in asynchronous hardware design. The hardware analogy to this asynchronous chain is using a latch. I'm not sure what the conceptual equivalent is in RxJS. I would assume taking the emitted array output from refcount and applying it to Observable.from(theArrayOutput) would work, but I can't figure out how to insert it in the chain of streams.
import {Component, OnInit} from '@angular/core';
import {Observable} from 'rxjs/Observable';
import {Subject} from 'rxjs/Subject';
type IObjectsOperation = (types: Object[]) => Object[];
@Component({
selector: 'app-events-test',
templateUrl: './events-test.component.html',
styleUrls: ['./events-test.component.css']
})
export class EventsTestComponent implements OnInit {
public objects: Observable<Object[]>;
public scanned: Observable<any>;
protected updates: Subject<any> = new Subject<any>();
protected functionStream: Subject<any> = new Subject<any>();
protected addStream: Subject<Object> = new Subject<Object>();
private initialObjects: Object[] = [1];
constructor() {
this.objects = this.updates
.scan((objects: Object[],
operation: IObjectsOperation) => {
return operation(objects);
},
this.initialObjects)
.publishReplay(1)
.refCount();
this.functionStream
.map(function (message: Object): IObjectsOperation {
return (messages: Object[]) => {
return messages.concat(message);
};
})
.subscribe(this.updates);
this.addStream.subscribe(this.functionStream);
this.scanned = this.objects
.mergeMap(val => val)
// some transformation that I want to have happen, in this case no-op
.filter(() => {return true})
// attempt to rebuild array, but items are accumulated
.scan( (acc: Array<any>, x: Object) => { return acc.concat(x); }, [])
}
transform(objects) {
return Observable.from(objects)
// this withLatestFrom suggestion didn't work
// .withLatestFrom( this.functionStream, ( val, fn ) => fn( val ) )
.filter(() => {
return true
})
.toArray()
}
start(): void {
console.log('---------STARTING');
this.objects.mergeMap(obj => this.transform(obj))
.subscribe(
obj => console.log('objects: ' + obj)
);
this.scanned.subscribe(
{
next: obj => {
console.log('scanned: ' + obj);
},
error: () => {
},
complete: () => console.log('COMPLETED')
}
);
this.add(2);
this.add(3);
}
add(object: Object): void {
this.addStream.next(object);
}
ngOnInit() {
this.start();
}
}
The output is below, and expected.
---------STARTING
events-test.component.ts:49 objects: 1,2
events-test.component.ts:52 scanned: 1
events-test.component.ts:52 scanned: 1,2
events-test.component.ts:49 objects: 1,2,3
events-test.component.ts:52 scanned: 1,2,1
events-test.component.ts:52 scanned: 1,2,1,2
events-test.component.ts:52 scanned: 1,2,1,2,3
What I would like to see is:
---------STARTING
events-test.component.ts:49 objects: 1,2
events-test.component.ts:52 scanned: 1,2
events-test.component.ts:49 objects: 1,2,3
events-test.component.ts:52 scanned: 1,2,3
I have a few workarounds that I can use:
- Keep the data as an array throughout the stream
- Convert the source non-completing observable to completing
I would assume the solutions for this architecture are:
- There is some operation I can insert into the stream to take the array from objects Observable, and return an observable that completes
- There is some external signal that needs to occur from the source Observable that indicates 'processing' and then the termination bounds the scan. This seems clumsy to me.
I would assume there would be some equivalent in RxJS to a asynchronous hardware latch, so I'd like to keep the current architecture. Additionally, I think RxJS is really cool and want to close a gap in my knowledge in terms of time based handling of streams.
EDIT: A good answer from @xtianjohns indicated how to do an inner loop, but the subscription still does not complete. The suggested addition of withLatestFrom caused the transform function to break, in commenting out that line, the array is rendered, but the outer loop does not complete. If the line is present, the array is not rendered, and the loop does not complete.