I currently have a simple TextMessage Source that sends messages to my Websocket client flow like this:
val incoming: Sink[Message, Future[Done]] =
Sink.foreach[Message] {
case message: TextMessage.Strict =>
println(message.text)
case _ =>
}
// send this as a message over the WebSocket
val outgoing: Source[TextMessage.Strict, NotUsed] = Source
.combine(
Source.single(
TextMessage(
"""{"auth":"APIKEY-123"}"""
)
),
Source.single(
TextMessage(
"""{"topic":"topic123"}"""
)
),
Source.never
)(Merge(_))
.throttle(1, 1.second)
val webSocketFlow = Http().webSocketClientFlow(
WebSocketRequest("wss://socket.polygon.io/stocks")
)
val (upgradeResponse, closed) =
outgoing
.viaMat(webSocketFlow)(
Keep.right
) // keep the materialized Future[WebSocketUpgradeResponse]
.toMat(incoming)(Keep.both) // also keep the Future[Done]
.run()
val connected = upgradeResponse.flatMap { upgrade =>
if (upgrade.response.status == StatusCodes.SwitchingProtocols) {
Future.successful(Done)
} else {
throw new RuntimeException(
s"Connection failed: ${upgrade.response.status}"
)
}
}
So I currently have a Source of type Source[TextMessage.Strict, NotUsed], but I want to use the commented out code where I have a ActorRef as my source.
I tried this:
val actorSource: Source[Any, ActorRef] = Source.actorRef(
completionMatcher = { case Done =>
CompletionStrategy.immediately
},
failureMatcher = PartialFunction.empty,
bufferSize = 100,
overflowStrategy = OverflowStrategy.dropHead
)
val actorRef: ActorRef = actorSource.to(Sink.foreach(println)).run()
actorRef ! """{"auth":"APIKEY-123"}"""
val webSocketFlow = Http().webSocketClientFlow(
WebSocketRequest("wss://socket.polygon.io/stocks")
)
val (upgradeResponse, closed) =
actorSource
.viaMat(webSocketFlow)(
Keep.right
) // keep the materialized Future[WebSocketUpgradeResponse]
.toMat(incoming)(Keep.both) // also keep the Future[Done]
.run()
So when I am using a ActorRef as my source, I am having a hard time trying to fit this into the graph. I am getting this compile time error:
type mismatch; [error] found : akka.stream.scaladsl.Flow[akka.http.scaladsl.model.ws.Message,akka.http.scaladsl.model.ws.Message,scala.concurrent.Future[akka.http.scaladsl.model.ws.WebSocketUpgradeResponse]] [error] required: akka.stream.Graph[akka.stream.FlowShape[String,?],?] [error]
.viaMat(webSocketFlow)(
Note: I want a Actor as my source, and also as my sink i.e. pass all messages that result from the flow to another actor as a sink.
Can someone explain what am I currently doing wrong with my Actor as a source and trying to add it to my flow/graph?
Update
Here is the code I have now:
def main(args: Array[String]): Unit = {
implicit val system = ActorSystem()
import system.dispatcher
val incoming: Sink[Message, Future[Done]] =
Sink.foreach[Message] {
case message: TextMessage.Strict =>
println(message.text)
case _ =>
// ignore other message types
}
val actorSource = Source.actorRef[String](
completionMatcher = { case Done =>
CompletionStrategy.immediately
},
failureMatcher = PartialFunction.empty,
bufferSize = 100,
overflowStrategy = OverflowStrategy.dropHead
)
val webSocketFlow = Http().webSocketClientFlow(
WebSocketRequest("wss://socket.polygon.io/stocks")
)
// the materialized value is a tuple with
// upgradeResponse is a Future[WebSocketUpgradeResponse] that
// completes or fails when the connection succeeds or fails
// and closed is a Future[Done] with the stream completion from the incoming sink
val ((sendActor, upgradeResponse), closed) =
actorSource
.viaMat(webSocketFlow)(
Keep.both
) // keep the materialized Future[WebSocketUpgradeResponse]
.toMat(incoming)(Keep.both) // also keep the Future[Done]
.run()
// just like a regular http request we can access response status which is available via upgrade.response.status
// status code 101 (Switching Protocols) indicates that server support WebSockets
val connected = upgradeResponse.flatMap { upgrade =>
if (upgrade.response.status == StatusCodes.SwitchingProtocols) {
Future.successful(Done)
} else {
throw new RuntimeException(
s"Connection failed: ${upgrade.response.status}"
)
}
}
sendActor ! TextMessage("""{"auth":"APIKEY-123"}""")
sendActor ! TextMessage("""{"topic":"topic123"}""")
//in a real application you would not side effect here
connected.onComplete(println)
closed.foreach(_ => println("closed"))
}
I am getting the following compile errors:
[error] The argument types of an anonymous function must be fully known. (SLS 8.5) [error] Expected type was: ? [error]
completionMatcher = { case Done => [error] ^ [error] /home/blank/scala/testing/streamsapp/ws2.scala:57:37: value flatMap is not a member of Any [error] val connected = upgradeResponse.flatMap { upgrade => [error]
^ [error] /home/blank/scala/testing/streamsapp/ws2.scala:67:15: value ! is not a member of Any [error] sendActor ! TextMessage("""{"auth":"APIKEY-123"}""") [error] ^ [error] /home/blank/scala/testing/streamsapp/ws2.scala:68:15: value ! is not a member of Any [error] sendActor ! TextMessage("""{"topic":"topic123"}""") [error] ^ [error] /home/blank/scala/testing/streamsapp/ws2.scala:72:12: value foreach is not a member of Any [error] closed.foreach(_ => println("closed")) [error] ^ [error] 5 errors found
actorSource
is aSource
so you won't get anActorRef
until yourun
it, and you get a differentActorRef
each time. And probably worth usingMessage
rather thanAny
in thesource.ActorRef
declaration – Tim