Web Socket connections in Akka Http are treated as an Akka Streams Flow
. This seems like it works great for basic request-reply, but it gets more complex when messages should also be pushed out over the websocket. The core of my server looks kind of like:
lazy val authSuccessMessage = Source.fromFuture(someApiCall)
lazy val messageFlow = requestResponseFlow
.merge(updateBroadcastEventSource)
lazy val handler = codec
.atop(authGate(authSuccessMessage))
.join(messageFlow)
handleWebSocketMessages {
handler
}
Here, codec
is a (de)serialization BidiFlow
and authGate
is a BidiFlow
that processes an authorization message and prevents outflow of any messages until authorization succeeds. Upon success, it sends authSuccessMessage
as a reply. requestResponseFlow
is the standard request-reply pattern, and updateBroadcastEventSource
mixes in async push messages.
I want to be able to send an error message and terminate the connection gracefully in certain situations, such as bad authorization, someApiCall
failing, or a bad request processed by requestResponseFlow
. So basically, basically it seems like I want to be able to asynchronously complete messageFlow
with one final message, even though its other constituent flows are still alive.