1
votes

i am trying to build an SSE using Angular.js single-page-app, and Play Framework as the backend. it seems like every time i submit a registration using angular's $http, the EventSource reconnects. because i am trying to make the UI flow smooth, on connection i add the push a NoOpUpdate over SSE, which sends the entire registered list, that the client takes and renders. on updates (registering a new user, etc) SSE will send the entire list but also the change (the added registrant, etc), and the client can tell by the event type what (and if) it needs to do (add the new registrant to the rendered list). but, for some reason, i keep getting the NoOpUpdate after submit, instead of the update event.

code below: SSE connection (in the angular controller - coffeescript notation):

  $scope.sseOpen = () ->
    alert("SSE connection opened")
    return

  $scope.sseMessage = (message) ->
    jsonMessage = JSON.parse(message.data)
     if (jsonMessage.type.indexOf("NoOpUpdate")==0 
      $scope.$apply () ->
        list = $scope.list = jsonMessage.list
    else if jsonMessage.type = "Added"
      newOne = jsonMessage.affected
      $scope.$apply () ->
        $scope.list.push(newOne)
    return

  $scope.listen = () ->
    $scope.serverListener = new EventSource('<address>')
    $scope.serverListener.addEventListener('message', $scope.sseMessage, false)
    $scope.serverListener.addEventListener('open', $scope.sseOpen, false);
    return

  $scope.listen()

calling the save method from the controller:

  $scope.saveSuccessCallback = () ->
    alert("save succeeded")

  $scope.saveFailureCallback = () ->
    alert("save failed")

  $scope.save = ->
    myData = {
    ...
    }      
    AjaxFactory.save('<address>', myData, $scope.saveSuccessCallback, $scope.saveFailureCallback)
    return 

factory method for $HTTP:

  save : (myURL, myData, successCallback, failureCallback)->
      $http({
        url: myURL,
        method: "POST",
        data: myData
      })
      .success (data, status, headers, config) ->
        successCallback
      .error (data, status, headers, config) ->
        failureCallback

server side code:

  def getSSE = Action { implicit request =>
    Async {
      Receiver.join map { join =>
        Ok.feed(join._2 &> EventSource()).as("text/event-stream")
      }
    }
  }

Receiver:

class Receiver extends Actor {


  val (enumerator, channel) = Concurrent.broadcast[JsValue]

  def receive = {
    case Connect => {
      sender ! Connected(enumerator)
    }

    case Save(newOne) => {
      MyDao.save(newOne) map { saved =>
        sender ! ActionSuccess("New one Created", Some(saved))
        self ! notifyAll("Added", Some(saved)
      }
    }

    case NoOpUpdate(eventType, affectedOne) => {
      notifyAll("NoOpUpdate: " + eventType, affectedOne)
    }
  }

  def notifyAll(eventType: String, affectedOne: Option[Person]) {
    MyDao.findAll map { people =>
      val msg = JsObject(
        Seq(
          "type" -> JsString(eventType),
          "affectedOne" -> Json.toJson(affectedOne),
          "list" -> Json.toJson(people)
        )  
      )
      channel.push(msg)
    }
  }
}

object Receiver {
  val default = Akka.system.actorOf(Props[Receiver])
  implicit val timeout = Timeout(1 second)

  def update(eventType: String, affectedOne: Option[Person] = None) {
    default ! NoOpUpdate(eventType, affectedOne)
  }

  def join: Future[(Iteratee[JsValue,_],Enumerator[JsValue])] = {
    default ? Connect map {
      case Connected(enumerator) => {
        val iteratee = Iteratee.foreach[JsValue]{ js => 
          Json.stringify(js \ "type") match {
            case "save" => {
              Json.fromJson[Person](js \ "person").asOpt map { jsOpt => 
                default ? Save(jsOpt)                
              }
            }
            case _ => {

            }
          } 
        }
        (iteratee, enumerator)
      }
    }
  }

MyDAO:

  def save(person: Person): Future[Person] = {
    collection.save(person).map {
      case ok if ok.ok =>
        person
      case error => throw new RuntimeException(error.message)
    }
  }  

  def findAll: Future[Seq[Guardian]] = {
    collection.find(Json.obj())
      .sort(Json.obj("_id" -> -1))
      .cursor[Guardian]
      .toList
  }

lots of code and a long question. any idea?

1

1 Answers

1
votes

Not 100% sure why it's closing, but looking at this:

$scope.serverListener.addEventListener('message', $scope.sseMessage, false)

You're subscribing to messages of type message (similar with open). But you're not sending a type with any of the messages you're sending (setting a property called type in the JSON is not sending a type). To send a type, you need to implement a play.api.libs.EventSource.EventNameExtractor that will extract the name out of your event. Typically I do something like this:

https://github.com/typesafehub/play-mongo-knockout/blob/master/app/models/Event.scala

Then I make my enumerator an enumerator of these event types.

Also, I hope you know you never use the iteratee that you create in the join method. Perhaps you're also using that for a WebSocket, if that's the case, that's fine.

Anyway, to debug why it's closing... what does the network tab in the developer console in your browser say?