I am using Akka Streams and Akka Http to implement a websocket stream. The stream uses a queue as a source to which TextMessage
's are offered as follows:
val (queue, source) = Source.queue[Message](0, OverflowStrategy.backpressure).recoverWithRetries(-1, {
case exception: Exception =>
println(exception)
Source(Nil)
}).preMaterialize()
def send[T](message: T)(implicit jsonFormat: JsonFormat[T]): Unit = queue.offer(TextMessage.Strict(message.toJson.toString()))
The flow is build as follows
val flow: Flow[Message, Message, NotUsed] = Flow.fromSinkAndSource(Sink.ignore, source).via(reportErrorsFlow)
def reportErrorsFlow[T]: Flow[T, T, Any] =
Flow[T]
.watchTermination()((_, f) => f.onComplete {
case Failure(cause) =>
println(s"WS stream failed with $cause")
case ex => println("Complete", ex) // ignore regular completion
})
It is then provided to the routes
val websocketRoute: Route =
path(pathName) {
handleWebSocketMessages(flow)
}
The server is then spun up
val routes = cors() {
concat(health, websocketRoute, ...other routes)
}
val bindingFuture = Http().newServerAt("localhost", 8080).bind(routes)
println("Server online at http://localhost:8080/")
println("Press RETURN to stop...")
StdIn.readLine() // let it run until user presses return
bindingFuture
.flatMap(_.unbind()) // trigger unbinding from the port
.onComplete(_ => system.terminate())
The issue I am running into is that the web socket does not gracefully close and when closing the web socket while messages are inflight results in crashing the flow and I receive the following error when trying to connect again
Websocket handler failed with Cannot subscribe to shut-down Publisher (akka.stream.impl.ActorPublisher$NormalShutdownException: Cannot subscribe to shut-down Publisher)