0
votes

I have a WebSocket server in a Play app, and I want to move it to an akka-http service. I'm currently using ActorFlow.actorRef, which is a part of Play that doesn't exist in Akka.

When the WebSocket is accepted, I subscribe to a RabbitMQ queue, and forward each message to the WebSocket. When I receive a message from the WebSocket, I handle some locally and forward others on to a RabbitMQ exchange.

How would I do the same using akka-http? I can create a sink using Sink.actorRef and handle inbound messages there, but what about the source?

I can create a source with Source.actorRef, but how do I get access to the actor to send messages to when it's materialized? Is there a different kind of source I should use to send messages to from the foreach of my RabbitMQ subscription?

Once I have those, it looks like I can return the required flow using Flow.fromSinkAndSource.

1
Can you show what you've done so far so we can get more context? If you're looking for how to obtain materialized value, which is the actorRef in this case, mapMaterialized might be what you're looking forQingwei
@Qingwei I've been through a couple of different iterations, but this is what I have at the moment (using code taken from Play's ActorFlow.actorRef). It closes the connection as soon as it's opened, though. gist.github.com/danellis/643e858d830dbea58bd84e265fb64110Isvara

1 Answers

0
votes

I'll paraphrase your requirements

You have a websocket end point which needs to

  1. Process some request locally and send response back to client
  2. Forward some request to RabbitMQ
  3. Subscribe to rabbitMQ, and forward message from rabbitMQ to websocket client.

My suggestion is that avoid actor unless necessary, actor is powerful, but I find stream easier to read and reason about when it fits the model

Below is how you pipe Source and Sink together without actor

def wshandler: Flow[Message, Message, _] = {
    val rabbit = new Rabbit()
    val src =
      Source
        .actorRef(100, OverflowStrategy.dropBuffer)
        .mapMaterializedValue(ref => {
          rabbit
            .subscribe[String]("updates", queueName, topics) { 
              (body, topic) =>
                log.debug("Received from rabbit")

                // here you forward everything from rabbitmq to    
                // client using materialized actorRef
                ref ! TextMessage(body)
            }
        })

    // you need to implement your own pattern matching logic to differentiate between request to process 
    // locally and request to route to rabbitMQ
    val sink = Sink.foreach[Message](m => m match {
      case localReq => // your request response processing logic

      case rabbitMq => // publish to rabbitMQ
    })

    Flow.fromSinkAndSource(sink, src)
  }

This snippet does not implement anything of the gist you showed, hope it solve your problem