0
votes

I am new to RxJs 6.0 (or any RxJs version for that matter) and although I see how powerful it is, some simple concepts escape me.

I have a situation where i would like to emit an extra value into the output stream based upon the source stream but for the life of me I can't figure out how to do it. I really need a startsWith operator that can take a method instead of a static value and then I could achieve this. Here is some silly code that sets up the scenario.

import { startWith, scan, tap, mergeMap, map, concat } from 'rxjs/operators';

interface IData {
  data: number;
  emitExtraVal: boolean;
}

class obsData implements IData {
  constructor(data: number) {
    this.data = data;
    this.emitExtraVal = false;
  }
  public data: number;
  public emitExtraVal: boolean;

}

class extraData implements IData {
  constructor(data: number) {
    this.data = data;
    this.emitExtraVal = true;
  }

  public data: number;
  public emitExtraVal: boolean;
}
const sourceOne = of(new obsData(1),new obsData(2),new obsData(3));
/*const finalSource = sourceOne.pipe(
  map((sData) => <IData>new extraData(sData.data)),
  map((sData) => sData)
);*/
const finalSource = sourceOne.pipe(
  mergeMap((sData) => concat(of(<IData>new extraData(sData.data), of(sData))))
);
const subscribe = finalSource.subscribe(val => console.log('Data:' + val.emitExtraVal));

What I want to do is output an instance of extraData with the number in obsData followed by the obsData I just took in from the source. This is not the exact scenario I am attempting but it demonstrates the core of what I'm trying to do which is to create an extra output followed by another output, both of which rely on the a single source input.

This updated example of the problem is based on comments, however this example won't run because the syntax is not correct

This produces the following error:

You provided 'function (source) { return source.lift.call(concat_1.concat.apply(void 0, [source].concat(observables))); }' where a stream was expected. You can provide an Observable, Promise, Array, or Iterable.

---Update--- Here is the final answer that works thanks to the responses. The main problem I was having was that you can import concat from rxjs/operators or rxjs. You MUST import it from rxjs if you are using it in the pipe command.

// RxJS v6+
import { of, fromEvent, combineLatest, concat } from 'rxjs';
import { startWith, scan, tap, mergeMap, map } from 'rxjs/operators';

interface IData {
  data: number;
  emitExtraVal: boolean;
}

class obsData implements IData {
  constructor(data: number) {
    this.data = data;
    this.emitExtraVal = false;
  }
  public data: number;
  public emitExtraVal: boolean;

}

class extraData implements IData {
  constructor(data: number) {
    this.data = data;
    this.emitExtraVal = true;
  }

  public data: number;
  public emitExtraVal: boolean;
}
const sourceOne = of(new obsData(1),new obsData(2),new obsData(3));

const finalSource = sourceOne.pipe(
  mergeMap((sData) => concat(of(<IData>new extraData(sData.data), <IData>sData)))
);
const subscribe = finalSource.subscribe(val => console.log('Data:' + val.emitExtraVal));
1

1 Answers

0
votes

create an extra output followed by another output, both of which rely on the a single source input.

In my example dataItem is the "single source input" which is transformed to Observable of it elements using from operator. And later you can use flattening operators (like mergeMap or concatMap depending on the requirements) to "flatten" all of this Observables into a single one. See the example:

const { Observable, of, from, concat } = rxjs; // = require("rxjs")
const { mergeMap } = rxjs.operators; // = require("rxjs/operators")

const complexAjaxCall = id => of(`${id}-from-ajax`);

const ids = [1, 2, 3];

from(ids).pipe(
  mergeMap(id => concat(
    complexAjaxCall(id),
    of(id)
  )),
).subscribe(e => console.log(e));
<script src="https://unpkg.com/[email protected]/bundles/rxjs.umd.min.js"></script>