0
votes

I want to send notifications to clients via websockets. This notifications are generated by actors, hence I'm trying to create a stream of actor's messages at server startup and subscribe websockects connections to this stream (sending only those notifications emitted since subscription)

With Source.actorRef we can create a Source of actor messages.

val ref = Source.actorRef[Weather](Int.MaxValue, fail)
                .filter(!_.raining)
                .to(Sink foreach println )
                .run()

ref ! Weather("02139", 32.0, true)

But how can I subscribe (akka http*) websockets connections to this source if has been materialized already?

*WebSockets connections in Akka HTTP requires a Flow[Message, Message, Any]

What I'm trying to do is something like

// at server startup
val notifications: Source[Notification,ActorRef] = Source.actorRef[Notificacion](5,OverflowStrategy.fail)
val ref = notifications.to(Sink.foreach(println(_))).run()
val notificationActor = system.actorOf(NotificationActor.props(ref))

// on ws connection
val notificationsWS = path("notificationsWS") {
  parameter('name) { name ⇒
    get {
      onComplete(flow(name)){
        case Success(f) => handleWebSocketMessages(f)
        case Failure(e) => throw e
      }
    }
  }
}

def flow(name: String) = {
  val messages = notifications filter { n => n.name equals name } map { n => TextMessage.Strict(n.data) }
  Flow.fromSinkAndSource(Sink.ignore, notifications)
}

This doensn't work because the notifications source is not the one that is materialized, hence it doens't emit any element.

Note: I was using Source.actorPublisher and it was working but ktoso discourages his usage and also I was getting this error:

java.lang.IllegalStateException: onNext is not allowed when the stream has not requested elements, totalDemand was 0.
1

1 Answers

1
votes

You could expose the materialised actorRef to some external router actor using mapMaterializedValue.

Flow.fromSinkAndSourceMat(Sink.ignore, notifications)(Keep.right)
  .mapMaterializedValue(srcRef => router ! srcRef)

The router can keep track of your sources actorrefs (deathwatch can help tidying things up) and forward messages to them.

NB: you're probably already aware, but note that by using Source.actorRef to feed your flow, your flow will not be backpressure aware (with the strategy you chose it will just crash under load).