0
votes

I'm trying to convert a Peer.JS Peer object into an observable sequence of dataConnections. However, as shown below, the observable of the peer that receives a request for a connection doesn't always emit the dataConnection to its subscriber, even though a connection is always established.

From what I've read, fromEvent() creates hot observables? Is my subscriber missing the dataConnection because it is not subscribing early enough?

import Rx from 'rx';
import Peer from 'peerjs';
import config from '../config';



/**
 *  Converts a Peer.JS Peer into an Observable sequence of Peer.JS DataConnections
 *  @param {Peer} [peer] A Peer.js Peer
 *  @param {Observable} [connectionIds] An observable sequence of peerids that will be used to initiate dataConnections
 *  @returns {Observable} An observable sequence of DataConnections
 */
function fromPeer(peer, connectionIds) {

    var fromEvent = Rx.Observable.fromEvent;
    var throwError = Rx.Observable.throwError;

    var open = fromEvent(peer, 'open');
    var remoteConnections = fromEvent(peer, 'connection');


    return Rx.Observable.when(
        open.thenDo(function(){
            var localConnections = connectionIds.map((id) => {return peer.connect(id);});
            return remoteConnections.merge(localConnections);
        })
    ).mergeAll();



}



function test0(){

    var peer1 = new Peer("peer1", config.peer)
    var peer2 = new Peer("peer2", config.peer)

    var peer1connections = fromPeer(peer1, Rx.Observable.just("peer2"));
    var peer2connections = fromPeer(peer2, Rx.Observable.empty());

    peer1connections.subscribe(
        (conn) => {console.log("Got a connection: " + conn.peer);},
        (err) => {console.log("Error: " + err);},
        () => {console.log("complete");}
    );
    peer2connections.subscribe(
        (conn) => {console.log("Got a connection: " + conn.peer);},
        (err) => {console.log("Error: " + err);},
        () => {console.log("complete");}
    );

    peer1.on('open', ()=>{console.log("open1")})
    peer2.on('open', ()=>{console.log("open2")})
    peer2.on('connection', ()=>{console.log("connect2")})
};

// => Got a connection: peer2
// => open1
// => connect2
// => open2
// => Got a connection: peer1 -- This will not always be printed to the console.
1
would it be easy for you to make a jsfiddle with this code to play with it?user3743222
Well, it seems that I miss events only when the 'connection' event fires before the 'open' event. It's as if any element from the remoteConnections stream is being lost before the open.thenDo(...) callback is fired.omp

1 Answers

0
votes

I think I've solved this. The problem appears to be that any elements that, if the peer emits 'connnection' before it emits 'open', any result from 'connection' events emitted by the peer will not be merged into the observable that is returned by fromPeer.

So, I used a ReplaySubject as a buffer. The connectionSubject captures any DataConnections and releases them when the callback inside the flatMap operator runs.

Here is the modified fromPeer function. Hopefully, this will help someone!

function fromPeer(peer, connectionIds) {

  var fromEvent = Rx.Observable.fromEvent;
  var throwError = Rx.Observable.throwError;
  var connectionSubject = new Rx.ReplaySubject(10);

  var open = fromEvent(peer, 'open');
  var close = fromEvent(peer, 'close');
  var disconnect = fromEvent(peer, 'disconnected');
  var error = fromEvent(peer, 'error').flatMap(throwError);
  var remoteConnections = fromEvent(peer, 'connection');

  remoteConnections.subscribe(connectionSubject);

  /* 
   * The Peer can fire a connection event before it receives an open event
   * Therefore, remoteConnections must be buffered until the open event arrives
   */

  return open.flatMap(
    () => {
      var localConnections = connectionIds.map((id) => {return peer.connect(id);});
      return connectionSubject.merge(localConnections);
    }
  ).merge(error).takeUntil(close.amb(disconnect));

}