i am trying to build an SSE using Angular.js single-page-app, and Play Framework as the backend. it seems like every time i submit a registration using angular's $http, the EventSource reconnects. because i am trying to make the UI flow smooth, on connection i add the push a NoOpUpdate over SSE, which sends the entire registered list, that the client takes and renders. on updates (registering a new user, etc) SSE will send the entire list but also the change (the added registrant, etc), and the client can tell by the event type what (and if) it needs to do (add the new registrant to the rendered list). but, for some reason, i keep getting the NoOpUpdate after submit, instead of the update event.
code below: SSE connection (in the angular controller - coffeescript notation):
$scope.sseOpen = () ->
alert("SSE connection opened")
return
$scope.sseMessage = (message) ->
jsonMessage = JSON.parse(message.data)
if (jsonMessage.type.indexOf("NoOpUpdate")==0
$scope.$apply () ->
list = $scope.list = jsonMessage.list
else if jsonMessage.type = "Added"
newOne = jsonMessage.affected
$scope.$apply () ->
$scope.list.push(newOne)
return
$scope.listen = () ->
$scope.serverListener = new EventSource('<address>')
$scope.serverListener.addEventListener('message', $scope.sseMessage, false)
$scope.serverListener.addEventListener('open', $scope.sseOpen, false);
return
$scope.listen()
calling the save method from the controller:
$scope.saveSuccessCallback = () ->
alert("save succeeded")
$scope.saveFailureCallback = () ->
alert("save failed")
$scope.save = ->
myData = {
...
}
AjaxFactory.save('<address>', myData, $scope.saveSuccessCallback, $scope.saveFailureCallback)
return
factory method for $HTTP:
save : (myURL, myData, successCallback, failureCallback)->
$http({
url: myURL,
method: "POST",
data: myData
})
.success (data, status, headers, config) ->
successCallback
.error (data, status, headers, config) ->
failureCallback
server side code:
def getSSE = Action { implicit request =>
Async {
Receiver.join map { join =>
Ok.feed(join._2 &> EventSource()).as("text/event-stream")
}
}
}
Receiver:
class Receiver extends Actor {
val (enumerator, channel) = Concurrent.broadcast[JsValue]
def receive = {
case Connect => {
sender ! Connected(enumerator)
}
case Save(newOne) => {
MyDao.save(newOne) map { saved =>
sender ! ActionSuccess("New one Created", Some(saved))
self ! notifyAll("Added", Some(saved)
}
}
case NoOpUpdate(eventType, affectedOne) => {
notifyAll("NoOpUpdate: " + eventType, affectedOne)
}
}
def notifyAll(eventType: String, affectedOne: Option[Person]) {
MyDao.findAll map { people =>
val msg = JsObject(
Seq(
"type" -> JsString(eventType),
"affectedOne" -> Json.toJson(affectedOne),
"list" -> Json.toJson(people)
)
)
channel.push(msg)
}
}
}
object Receiver {
val default = Akka.system.actorOf(Props[Receiver])
implicit val timeout = Timeout(1 second)
def update(eventType: String, affectedOne: Option[Person] = None) {
default ! NoOpUpdate(eventType, affectedOne)
}
def join: Future[(Iteratee[JsValue,_],Enumerator[JsValue])] = {
default ? Connect map {
case Connected(enumerator) => {
val iteratee = Iteratee.foreach[JsValue]{ js =>
Json.stringify(js \ "type") match {
case "save" => {
Json.fromJson[Person](js \ "person").asOpt map { jsOpt =>
default ? Save(jsOpt)
}
}
case _ => {
}
}
}
(iteratee, enumerator)
}
}
}
MyDAO:
def save(person: Person): Future[Person] = {
collection.save(person).map {
case ok if ok.ok =>
person
case error => throw new RuntimeException(error.message)
}
}
def findAll: Future[Seq[Guardian]] = {
collection.find(Json.obj())
.sort(Json.obj("_id" -> -1))
.cursor[Guardian]
.toList
}
lots of code and a long question. any idea?