An actor initializes an Akka stream which connects to a websocket. This is done by using a Source.actorRef
to which messages can be send, which are then processed by the webSocketClientFlow
and consumed by a Sink.foreach
. This can be seen in the following code (derived from akka docs):
class TestActor @Inject()(implicit ec: ExecutionContext) extends Actor with ActorLogging {
final implicit val system: ActorSystem = ActorSystem()
final implicit val materializer: ActorMaterializer = ActorMaterializer()
def receive = {
case _ =>
}
// Consume the incoming messages from the websocket.
val incoming: Sink[Message, Future[Done]] =
Sink.foreach[Message] {
case message: TextMessage.Strict =>
println(message.text)
case misc => println(misc)
}
// Source through which we can send messages to the websocket.
val outgoing: Source[TextMessage, ActorRef] =
Source.actorRef[TextMessage.Strict](bufferSize = 10, OverflowStrategy.fail)
// flow to use (note: not re-usable!)
val webSocketFlow = Http().webSocketClientFlow(WebSocketRequest("wss://ws-feed.gdax.com"))
// Materialized the stream
val ((ws,upgradeResponse), closed) =
outgoing
.viaMat(webSocketFlow)(Keep.both)
.toMat(incoming)(Keep.both) // also keep the Future[Done]
.run()
// Check whether the server has accepted the websocket request.
val connected = upgradeResponse.flatMap { upgrade =>
if (upgrade.response.status == StatusCodes.SwitchingProtocols) {
Future.successful(Done)
} else {
throw new RuntimeException(s"Failed: ${upgrade.response.status}")
}
}
// When the connection has been established.
connected.onComplete(println)
// When the stream has closed
closed.onComplete {
case Success(_) => println("Test Websocket closed gracefully")
case Failure(e) => log.error("Test Websocket closed with an error\n", e)
}
}
When play framework recompiles it closes the TestActor but does not close the Akka stream. Only when the websocket timeouts the stream is closed.
Does this mean that I need to close the stream manually by for example, sending the actor created with Source.actorRef
a PoisonPill
in the TestActor PostStop
function?
Note: I also tried to inject the Materializer
and the Actorsystem
i.e:
@Inject()(implicit ec: ExecutionContext, implicit val mat: Materializer, implicit val system: ActorSystem)
When Play recompiles, the stream is closed, but also produces an error:
[error] a.a.ActorSystemImpl - Websocket handler failed with
Processor actor [Actor[akka://application/user/StreamSupervisor-62/flow-0-0-ignoreSink#989719582]]
terminated abruptly