I am trying to consume a stream via WebSockets, and I would like to parse the Messages received and transform them before they reach the Sink. However, I keep getting errors whenever I use a Source or a Sink that does not take in Message
as input.
According to the documentation:
Therefore a WebSocket connection is modelled as either something you connect a Flow[Message, Message, Mat] to or a Flow[Message, Message, Mat] that you connect a Source[Message, Mat] and a Sink[Message, Mat] to.
I am still not sure if I am getting this correctly. My confusion is: do Sources, Flows & Sinks using Akka-http websockets have to always pass around the type Message
? Is there a way around it? And most importantly, what is the best practice here?
I formulated a simplified snippet of my code (that is not intended to be runnable) but rather should help conceptualizing my question.
val outgoing = Source.maybe[Message]
val decoder = Flow[Message] map {x => TextMessage("Hello from decode")}
// Do I need to pass a Message here?
val wrongDecoder = Flow[String] map {x => "Help :( I can't Sink! Maybe because I'm String?"}
val sink: Sink[Message, Future[Done]] = Sink.foreach[Message] {case message: TextMessage.Strict => message.text}
val webSocketFlow = Http().webSocketClientFlow(WebSocketRequest(uri))
val (upgradeResponse, closed) =
outgoing
.viaMat(webSocketFlow)(Keep.right)
.viaMat(decoder)(Keep.left)
.viaMat(wrongDecoder)(Keep.left) // IDE compiler tells me it expected a Graph but found a Flow?
.toMat(sink)(Keep.both)
.run()