6
votes

Trying to figure out how to get my epic going which will subscribe to a websocket and then dispatch some actions as the emitted events roll in from the websocket.

The sample I see are using a multiplex and not actually calling a subscribe on websocket, and I'm confused a bit on changing it up.

I have started it like this. But I believe the redux observable is wanting an

const socket$ = Observable.webSocket<DataEvent>(
  "ws://thewebsocketurl"
);

const bankStreamEpic = (action$, store) =>
  action$.ofType(START_BANK_STREAM).mergeMap(action => {
    console.log("in epic mergeMap");
    socket$
      .subscribe(
        e => {
          console.log("dispatch event " + e);
         distributeEvent(e);
        },
        e => {
          logger.log("AmbassadorsDataService", "Unclean socket closure");
        },
        () => {
          logger.log("AmbassadorsDataService", "Socket connection closed");
        }
      )
  });

   function distributeEvent(event: DataEvent) : void {
        //this.logger.log('AmbassadorsDataService', 'Event Received: ' + event.command + ' and id: ' + event.id);
        if(event.source === '/ambassadors/bank') {
            if( event.command === 'REMOVE') {
                removeDataEvent(event);
            }
            else if(event.command == 'ADD') {
                loadDataEvent(event);
            }
        }
    }

It is throwing an error: Uncaught TypeError: You provided 'undefined' where a stream was expected. You can provide an Observable, Promise, Array, or Iterable.

Any help would be appreciated!

Thanks

1

1 Answers

14
votes

In redux-observable, you will almost never (unless you know why I say "almost") call subscribe yourself. Instead, Observables are chained and the middleware and other operators will handle subscriptions for you.

If all you want to do is dispatch an action for every event received, that's simple:

const socket$ = Observable.webSocket<DataEvent>(
  "ws://thewebsocketurl"
);

const bankStreamEpic = (action$, store) =>
  action$.ofType('START_BANK_STREAM')
    .mergeMap(action =>
      socket$
        .map(payload => ({
          type: 'BANK_STREAM_MESSAGE',
          payload
        }))
    );

You may (or may not) need to do more customization depending on what the content of the message received from the socket is, but actually you might be better served placing that other logic in your reducers since it probably isn't side effect related.

You probably will want a way to stop the stream, which is just a takeUntil:

const socket$ = Observable.webSocket<DataEvent>(
  "ws://thewebsocketurl"
);

const bankStreamEpic = (action$, store) =>
  action$.ofType('START_BANK_STREAM')
    .mergeMap(action =>
      socket$
        .map(payload => ({
          type: 'BANK_STREAM_MESSAGE',
          payload
        }))
        .takeUntil(
          action$.ofType('STOP_BANK_STREAM')
        )
    );

I used mergeMap because you did, but in this case I think switchMap is more apt, since each having multiple of these seems redundant, unless you need to have multiple and your question just omits something unique about each.