11
votes

So I'm experimenting with ngrx & ngrx/effects by building the following sandbox:

https://stackblitz.com/edit/ngrx-vanilla

Quick intro:

  • it has a root store in app/store
  • it has lazy loading of two modules in app/features
  • it has singleton services in app/commons

Three pages:

  • action items: routing to this pages triggers a random generation of three goofy corporate action items
  • users: a basic master > detail redux implementation with router support
  • meeting: the place that raises my question, click the "start meeting" to witness a relevant exchange of ideas.

Question & context:

  • I understand all data updates in redux are to happen via actions
  • the "effects" library is to handle async events in order to dispatch new actions based on 3rd party events and async calls.
  • the app/common/meeting/service imitates the behavior of for instance a websocket or a firebase realtime DB pushing updates.

Upon receiving an update (illustrated in app/store/effects/meeting.effects.ts), a new action is dispatched.

Finally, the question: Is it a clean practice to have a a common service know about the store? Where is the best place to register a listener to a websocket / firebase realtime db in order to dispatch actions upon data being pushed?

Here, I made it so that an effect (meeting.effects) reacts upon the meetingActions.START_MEETING action type and whenever data is pushed, dispatch an update order to the store, but this feels wrong for a series of reasons I come up with:

  • Hard to unit test in isolation (needs more context than itself)
  • In case of a "stop meeting" action, this approach needs to store a Subscription (or?) in order to stop the subscription. In my approach, there's no control over the observable being created in the wilderness.

How are such cases usually handled?

3

3 Answers

11
votes

Assuming websocket is emitting different kind of events, map each event to different actions in a websocket service like

@Injectable()
class WebsocketService{
    private socket$:Observable<Action>
    getActions():Observable<Action>{
        if(!this.socket$) {
           this.socket$ = Observable.webSocket(url).pipe(
               map(functionToMapActions)
            ,shareReplay(1)
           );
        }
        return this.socket$;
    }
}

where functionToMapActions maps webSocket events to actions, and I suggest adding shareReplay operator at end so that we read webSocket only once.

the Observable.webSocket connects to webSocket , and emits events as they arrive

webSocket connection will be established when you suscribe to webService.getActions()

You can subscribe to websocket actions in @Effects initialization see here

@Effect()
init$ = this.websocketService.getActions();

this will emit all actions as soon as your app starts (if effect in in root module) or your module is loaded if it is in lazy loaded module;

Or if you are interrested in limited set of actions you can do like this

@Effect()
init$ = this.websocketService.getActions().pipe(filter(filterNeededActionsHere));

you can also start listening to actions only after perticular event like this

@Effect()
init$ = this.actions$.pipe(
          ofType('Event which marks start')
          ,swichMapTo(this.websocketService.getActions())
     );

like previous example you can also filter out action here sane as before

hope this answers your question

2
votes

NgRx v9 changed syntax a little. Use following code for root-level effects class:

init$ = createEffect(() => 
  this.actions$.pipe(
    ofType(ROOT_EFFECTS_INIT),
    // websocket events handling logic
    switchMap(() => webSocketEvents$)
  )
);

This is just slightly adapted example from the Docs.

If you work with feature-level effect class, ROOT_EFFECTS_INIT won't work, you need to use OnRunEffects lifecycle hook:

class UserEffects implements OnRunEffects  {
  ngrxOnRunEffects(resolvedEffects$: Observable<EffectNotification>) {
    // websocket events handling logic
    return webSocketEvents$.pipe(
      exhaustMap(() => resolvedEffects$)
    );
  }
}

More detailed example is in the Docs.

webSocketEvents$ in both examples is an Observable that can be constructed using one of the rxjs functions:

webSocket(socketUrl) // if you want to manage websocket connection via rxjs
fromEvent(socketIoClient) // useful if websocket connection handled by socket.io-client
1
votes

A custom example of WebSocket emitting a different kind of events for an entity Vehicle using adapter

1.Create a Vehicle-Socket-Adapter.ts

import { fromEvent, merge, Observable } from 'rxjs';
import { map } from 'rxjs/operators';
import { SocketIoService } from 'src/app/shared/services/socket-io.service';

export class VehicleSocketAdapter {
  public onVehicleEngineStateChange: Observable<boolean>;
  constructor(vehicleId: string, private socketIoService: SocketIoService) {

   //using merge from rxjs/operators to concat multiple ws events to one Observable 

    this.onVehicleEngineStateChange = merge(
      fromEvent(this.socketIoService.socket, `event.vehicle.${vehicleId}.start`).pipe(
        map(vehicleStatus => true)
      ),
      fromEvent(this.socketIoService.socket,`event.vehicle.${vehicleId}.stop`).pipe(
        map(vehicleStatus => false)
      ),
    )
  }
}

2.Later import the adapter wherever you would like to use, an example app.component.ts

private subscriptions = new Subscription();
private listenForVehiclesState(vehicles) {
    vehicles.forEach((vehicle) => {
      const vehicleAdapter = new VehicleSocketAdapter(vehicle.id, this.webSocket);
      this.subscriptions.add( 
        vehicleAdapter.onVehicleEngineStateChange.subscribe(vehicleStatus => {
        
       // dispatch action when adapter commands you

        this.store.dispatch(
          VehiclesActions.BeginUpdateVehiclesStatusAction({
            payload: {
              vehicleId: vehicle.id,
              status: vehicleStatus
            }
          })
        );

      }));
    });
  }
  1. Don't forget to unsubscribe all subs when your view or a component is destroyed this.subscriptions.unsubscribe();

Bonus: Socket as service snippet :)

import { Injectable } from '@angular/core';
import * as io from 'socket.io-client';

@Injectable({
  providedIn: 'root'
})
export class SocketIoService {

  public socket: SocketIOClient.Socket = io('/', { path: '/api/livedata', transportOptions: { polling: extraHeaders: {'AuthToken': 'ifAny'} }});

constructor() { }
}

Happy Coding :)