1
votes

I am playing around with Akka Stream/Http and trying to figure out how to do the following thing in a websocket server (hopefully without using Actor pattern):

1. At initial connection request from client, after the handshake, server will listen to client's initial Message in json format.
2. After server received the TextMessage.Strict and if valid it will enrich the Message to build a "Predicate" (eg. a filter map), Then the server will use that "Predicate" to build a Source[Message, _].

I tried using handleMessagesWithSinkSource but it seems to me this API's intend is for out-generating Sink and Source independently(see route /ws). I also tried using handleWebSocketMessages(see route ws/filter), but to me there is no connection between Sink(inlet) and Source(outlet) in the Flow. I must be wrong in understanding how Sink should work here:

my route:

              (path("ws") & get & pathEndOrSingleSlash) {
                extractUpgradeToWebSocket { upgrade =>
                  complete(upgrade.handleMessagesWithSinkSource(Sink.ignore,
                    getSourceAll)
                } // this route is working fine, pushing all event Messages to client once connected 
              } ~
              (path("ws" / "filter") & get & pathEndOrSingleSlash) {
                handleWebSocketMessages(getSourceFiltered) 
              } // this route I cannot figure out a way to build `Flow` dynamically based on `Sink`...


idea of getSourceAll and getSourceFiltered

val getSourceAll: Source[Message, NotUsed] = ??? // Stream source genereating Messages based on backend event

val getSourceFiltered: Flow[Message, Message, _] = ??? // the outgoing Source should push Server event messages based on client's "Predicate" message 

After the connection being established, I would expect user send over filter like:

{
 productId: 1,
 city: New York
}

Then the server should keep pushing data stream(event) in realtime from backend(database) with more detailed product info back to user client:

{
orderId: 1122,
productId: 1,
productName: Coke,
vendor: ABC,
city: New York
timestamp: 2019-11-13 09:30:00
}
{
orderId: 3322,
productId: 1,
productName: Coke,
vendor: EFG,
city: New York
timestamp: 2019-11-13 09:31:00
}
...

Is Actor absolutely required to handle this? if so, any guidance would be really helpful!

Update In short, how can I emit an event-driven server-push Source based on user's Message?

Additionally, maybe I am confused on how to build a new Source[Message, _]? I know Flow is immutable but should be a way to switch based on input(Flow.fromSinkAndSource?). Because Akka only has directive api like: handleWebSocketMessages(flow: Flow[Message, Message, _]) which only consume the input message, but not produce a new Source and as for handleMessagesWithSinkSource(sink, source) the sink and source does not have logical connection to me. I am still trying to wrap my head around how to make it work..

1
Could you say more about how the predicate is determined in the "ws" / "filter" case? Maybe write out a sample of what the user would send over the websocket connection and how the server should reply? Based on what you've written, it seems unlikely you'd need to use Actor directly in any way.Alec
Thank you @Alec I have updated the description with example client-side input and expected server push stream. hope it makes more sense to you now.Rando

1 Answers

2
votes

Although it isn't immediately obvious, Flow[Message, Message, _] is enough for implementing most protocols. Remember that a Flow can build up almost arbitrary amounts of state via functions like statefulMapConcat or flatMapConcat. A Flow can even start emitting stuff without having directly received an input to reply to via functions like extrapolate or merge-ing with some ticking source.

In your case:

val getSourceFiltered: Flow[Message, Message, _] = Flow[Message]
  .take(1)  // you only care about the first thing that the client sends 
  .flatMapConcat {
    case TextMessage.Strict(txtMsg: String) =>

      // Here is where you parse and make your filter using the message the client message
      val clientFilter: Message => Boolean = makeFilter(txtMsg)
      getSourceAll.filter(clientFilter)


    case _ => Source.single(TextMessage("Expected a single strict JSON message"))
  }