1
votes

An actor initializes an Akka stream which connects to a websocket. This is done by using a Source.actorRef to which messages can be send, which are then processed by the webSocketClientFlow and consumed by a Sink.foreach. This can be seen in the following code (derived from akka docs):

class TestActor @Inject()(implicit ec: ExecutionContext) extends Actor with ActorLogging {

  final implicit val system: ActorSystem = ActorSystem()
  final implicit val materializer: ActorMaterializer = ActorMaterializer()

  def receive = {
    case _ =>
  }

  // Consume the incoming messages from the websocket.
  val incoming: Sink[Message, Future[Done]] =
  Sink.foreach[Message] {
    case message: TextMessage.Strict =>
      println(message.text)
    case misc => println(misc)
  }

  // Source through which we can send messages to the websocket.
  val outgoing: Source[TextMessage, ActorRef] =
  Source.actorRef[TextMessage.Strict](bufferSize = 10, OverflowStrategy.fail)

  // flow to use (note: not re-usable!)
  val webSocketFlow = Http().webSocketClientFlow(WebSocketRequest("wss://ws-feed.gdax.com"))

  // Materialized the stream
  val ((ws,upgradeResponse), closed) =
  outgoing
    .viaMat(webSocketFlow)(Keep.both)
    .toMat(incoming)(Keep.both) // also keep the Future[Done]
    .run()

  // Check whether the server has accepted the websocket request.
  val connected = upgradeResponse.flatMap { upgrade =>
    if (upgrade.response.status == StatusCodes.SwitchingProtocols) {
      Future.successful(Done)
    } else {
      throw new RuntimeException(s"Failed: ${upgrade.response.status}")
    }
  }

  // When the connection has been established.
  connected.onComplete(println)

  // When the stream has closed
  closed.onComplete {
    case Success(_) => println("Test Websocket closed gracefully")
    case Failure(e) => log.error("Test Websocket closed with an error\n", e)
  }

}

When play framework recompiles it closes the TestActor but does not close the Akka stream. Only when the websocket timeouts the stream is closed.

Does this mean that I need to close the stream manually by for example, sending the actor created with Source.actorRef a PoisonPill in the TestActor PostStop function?

Note: I also tried to inject the Materializer and the Actorsystem i.e:

@Inject()(implicit ec: ExecutionContext, implicit val mat: Materializer, implicit val system: ActorSystem)

When Play recompiles, the stream is closed, but also produces an error:

[error] a.a.ActorSystemImpl - Websocket handler failed with
Processor actor [Actor[akka://application/user/StreamSupervisor-62/flow-0-0-ignoreSink#989719582]] 
terminated abruptly
1

1 Answers

1
votes

In your first example, you're creating an actor system in your actor. You should not do that - actor systems are expensive, creating one means starting thread pools, starting schedulers, etc. Plus, you're never shutting it down, which means that you've got a much bigger problem than the stream not shutting down - you have a resource leak, the thread pools created by the actor system are never shut down.

So essentially, every time you receive a WebSocket connection, you're creating a new actor system with a new set of thread pools, and you're never shutting them down. In production, with even a small load (a few requests per second), your application is going to run out of memory within a few minutes.

In general in Play, you should never create your own actor system, but rather have one injected. From within an actor, you don't even need to have it injected because it automatically is - context.system gives you access to the actor system that created the actor. Likewise with materializers, these aren't as heavy weight, but if you create one per connection, you could also run out of memory if you don't shut it down, so you should have it injected.

So when you do have it injected, you get an error - this is hard to avoid, though not impossible. The difficulty is that Akka itself can't really know automatically what order things need to be shutdown in order to close things gracefully, should it shut your actor down first, so that it can shut the streams down gracefully, or should it shut the streams down, so that they can notify your actor that they are shutdown and respond accordingly?

Akka 2.5 has a solution for this, a managed shutdown sequence, where you can register things to be shutdown before the Actor system starts killing things in a somewhat random order:

https://doc.akka.io/docs/akka/2.5/scala/actors.html#coordinated-shutdown

You can use this in combination with Akka streams kill switches to shutdown your streams gracefully before the rest of the application is shut down.

But generally, the shutdown errors are fairly benign, so if it were me I wouldn't worry about them.