I am playing around with Akka Stream/Http and trying to figure out how to do the following thing in a websocket server (hopefully without using Actor pattern):
1. At initial connection request from client, after the handshake, server will listen to client's initial Message
in json
format.
2. After server received the TextMessage.Strict
and if valid it will enrich the Message
to build a "Predicate" (eg. a filter map), Then the server will use that "Predicate" to build a Source[Message, _]
.
I tried using handleMessagesWithSinkSource
but it seems to me this API's intend is for out-generating Sink
and Source
independently(see route /ws
). I also tried using handleWebSocketMessages
(see route ws/filter
), but to me there is no connection between Sink(inlet) and Source(outlet) in the Flow. I must be wrong in understanding how Sink
should work here:
my route:
(path("ws") & get & pathEndOrSingleSlash) {
extractUpgradeToWebSocket { upgrade =>
complete(upgrade.handleMessagesWithSinkSource(Sink.ignore,
getSourceAll)
} // this route is working fine, pushing all event Messages to client once connected
} ~
(path("ws" / "filter") & get & pathEndOrSingleSlash) {
handleWebSocketMessages(getSourceFiltered)
} // this route I cannot figure out a way to build `Flow` dynamically based on `Sink`...
idea of getSourceAll
and getSourceFiltered
val getSourceAll: Source[Message, NotUsed] = ??? // Stream source genereating Messages based on backend event
val getSourceFiltered: Flow[Message, Message, _] = ??? // the outgoing Source should push Server event messages based on client's "Predicate" message
After the connection being established, I would expect user send over filter like:
{
productId: 1,
city: New York
}
Then the server should keep pushing data stream(event) in realtime from backend(database) with more detailed product info back to user client:
{
orderId: 1122,
productId: 1,
productName: Coke,
vendor: ABC,
city: New York
timestamp: 2019-11-13 09:30:00
}
{
orderId: 3322,
productId: 1,
productName: Coke,
vendor: EFG,
city: New York
timestamp: 2019-11-13 09:31:00
}
...
Is Actor
absolutely required to handle this? if so, any guidance would be really helpful!
Update
In short, how can I emit an event-driven server-push Source based on user's Message?
Additionally, maybe I am confused on how to build a new Source[Message, _]
? I know Flow
is immutable but should be a way to switch based on input(Flow.fromSinkAndSource
?). Because Akka only has directive api like: handleWebSocketMessages(flow: Flow[Message, Message, _])
which only consume the input message, but not produce a new Source
and as for handleMessagesWithSinkSource(sink, source)
the sink and source does not have logical connection to me. I am still trying to wrap my head around how to make it work..
"ws" / "filter"
case? Maybe write out a sample of what the user would send over the websocket connection and how the server should reply? Based on what you've written, it seems unlikely you'd need to useActor
directly in any way. – Alec