I am new to RxJs 6.0 (or any RxJs version for that matter) and although I see how powerful it is, some simple concepts escape me.
I have a situation where i would like to emit an extra value into the output stream based upon the source stream but for the life of me I can't figure out how to do it. I really need a startsWith operator that can take a method instead of a static value and then I could achieve this. Here is some silly code that sets up the scenario.
import { startWith, scan, tap, mergeMap, map, concat } from 'rxjs/operators';
interface IData {
data: number;
emitExtraVal: boolean;
}
class obsData implements IData {
constructor(data: number) {
this.data = data;
this.emitExtraVal = false;
}
public data: number;
public emitExtraVal: boolean;
}
class extraData implements IData {
constructor(data: number) {
this.data = data;
this.emitExtraVal = true;
}
public data: number;
public emitExtraVal: boolean;
}
const sourceOne = of(new obsData(1),new obsData(2),new obsData(3));
/*const finalSource = sourceOne.pipe(
map((sData) => <IData>new extraData(sData.data)),
map((sData) => sData)
);*/
const finalSource = sourceOne.pipe(
mergeMap((sData) => concat(of(<IData>new extraData(sData.data), of(sData))))
);
const subscribe = finalSource.subscribe(val => console.log('Data:' + val.emitExtraVal));
What I want to do is output an instance of extraData with the number in obsData followed by the obsData I just took in from the source. This is not the exact scenario I am attempting but it demonstrates the core of what I'm trying to do which is to create an extra output followed by another output, both of which rely on the a single source input.
This updated example of the problem is based on comments, however this example won't run because the syntax is not correct
This produces the following error:
You provided 'function (source) { return source.lift.call(concat_1.concat.apply(void 0, [source].concat(observables))); }' where a stream was expected. You can provide an Observable, Promise, Array, or Iterable.
---Update--- Here is the final answer that works thanks to the responses. The main problem I was having was that you can import concat from rxjs/operators or rxjs. You MUST import it from rxjs if you are using it in the pipe command.
// RxJS v6+
import { of, fromEvent, combineLatest, concat } from 'rxjs';
import { startWith, scan, tap, mergeMap, map } from 'rxjs/operators';
interface IData {
data: number;
emitExtraVal: boolean;
}
class obsData implements IData {
constructor(data: number) {
this.data = data;
this.emitExtraVal = false;
}
public data: number;
public emitExtraVal: boolean;
}
class extraData implements IData {
constructor(data: number) {
this.data = data;
this.emitExtraVal = true;
}
public data: number;
public emitExtraVal: boolean;
}
const sourceOne = of(new obsData(1),new obsData(2),new obsData(3));
const finalSource = sourceOne.pipe(
mergeMap((sData) => concat(of(<IData>new extraData(sData.data), <IData>sData)))
);
const subscribe = finalSource.subscribe(val => console.log('Data:' + val.emitExtraVal));