0
votes

Web Socket connections in Akka Http are treated as an Akka Streams Flow. This seems like it works great for basic request-reply, but it gets more complex when messages should also be pushed out over the websocket. The core of my server looks kind of like:

lazy val authSuccessMessage = Source.fromFuture(someApiCall)

lazy val messageFlow = requestResponseFlow
    .merge(updateBroadcastEventSource)

lazy val handler = codec
  .atop(authGate(authSuccessMessage))
  .join(messageFlow)

handleWebSocketMessages {
  handler
}

Here, codec is a (de)serialization BidiFlow and authGate is a BidiFlow that processes an authorization message and prevents outflow of any messages until authorization succeeds. Upon success, it sends authSuccessMessage as a reply. requestResponseFlow is the standard request-reply pattern, and updateBroadcastEventSource mixes in async push messages.

I want to be able to send an error message and terminate the connection gracefully in certain situations, such as bad authorization, someApiCall failing, or a bad request processed by requestResponseFlow. So basically, basically it seems like I want to be able to asynchronously complete messageFlow with one final message, even though its other constituent flows are still alive.

1

1 Answers

0
votes

Figured out how to do this using a KillSwitch.

Updated version

The old version had the problem that it didn't seem to work when triggered by a BidiFlow stage higher up in the stack (such as my authGate). I'm not sure exactly why, but modeling the shutoff as a BidiFlow itself, placed further up the stack, resolved the issue.

val shutoffPromise = Promise[Option[OutgoingWebsocketEvent]]()

/**
 * Shutoff valve for the connection. It is triggered when `shutoffPromise`
 * completes, and sends a final optional termination message if that
 * promise resolves with one.
 */
val shutoffBidi = {
  val terminationMessageSource = Source
    .maybe[OutgoingWebsocketEvent]
    .mapMaterializedValue(_.completeWith(shutoffPromise.future))

  val terminationMessageBidi = BidiFlow.fromFlows(
    Flow[IncomingWebsocketEventOrAuthorize],
    Flow[OutgoingWebsocketEvent].merge(terminationMessageSource)
  )

  val terminator = BidiFlow
    .fromGraph(KillSwitches.singleBidi[IncomingWebsocketEventOrAuthorize, OutgoingWebsocketEvent])
    .mapMaterializedValue { killSwitch =>
      shutoffPromise.future.foreach { _ => println("Shutting down connection"); killSwitch.shutdown() }
    }

  terminationMessageBidi.atop(terminator)
}

Then I apply it just inside the codec:

val handler = codec
  .atop(shutoffBidi)
  .atop(authGate(authSuccessMessage))
  .join(messageFlow)

Old version

val shutoffPromise = Promise[Option[OutgoingWebsocketEvent]]()

/**
 * Shutoff valve for the flow of outgoing messages. It is triggered when
 * `shutoffPromise` completes, and sends a final optional termination
 * message if that promise resolves with one.
 */
val shutoffFlow = {
  val terminationMessageSource = Source
    .maybe[OutgoingWebsocketEvent]
    .mapMaterializedValue(_.completeWith(shutoffPromise.future))

  Flow
    .fromGraph(KillSwitches.single[OutgoingWebsocketEvent])
    .mapMaterializedValue { killSwitch =>
      shutoffPromise.future.foreach(_ => killSwitch.shutdown())
    }
    .merge(terminationMessageSource)
}

Then handler looks like:

val handler = codec
  .atop(authGate(authSuccessMessage))
  .join(messageFlow via shutoffFlow)