0
votes

I currently have a simple TextMessage Source that sends messages to my Websocket client flow like this:

     val incoming: Sink[Message, Future[Done]] =
      Sink.foreach[Message] {
        case message: TextMessage.Strict =>
          println(message.text)
        case _ =>
      }

    // send this as a message over the WebSocket
    val outgoing: Source[TextMessage.Strict, NotUsed] = Source
      .combine(
        Source.single(
          TextMessage(
            """{"auth":"APIKEY-123"}"""
          )
        ),
        Source.single(
          TextMessage(
            """{"topic":"topic123"}"""
          )
        ),
        Source.never
      )(Merge(_))
      .throttle(1, 1.second)

    val webSocketFlow = Http().webSocketClientFlow(
      WebSocketRequest("wss://socket.polygon.io/stocks")
    )

    val (upgradeResponse, closed) =
      outgoing
        .viaMat(webSocketFlow)(
          Keep.right
        ) // keep the materialized Future[WebSocketUpgradeResponse]
        .toMat(incoming)(Keep.both) // also keep the Future[Done]
        .run()

    val connected = upgradeResponse.flatMap { upgrade =>
      if (upgrade.response.status == StatusCodes.SwitchingProtocols) {
        Future.successful(Done)
      } else {
        throw new RuntimeException(
          s"Connection failed: ${upgrade.response.status}"
        )
      }
    }

So I currently have a Source of type Source[TextMessage.Strict, NotUsed], but I want to use the commented out code where I have a ActorRef as my source.

I tried this:

  val actorSource: Source[Any, ActorRef] = Source.actorRef(
  completionMatcher = { case Done =>
    CompletionStrategy.immediately
  },
  failureMatcher = PartialFunction.empty,
  bufferSize = 100,
  overflowStrategy = OverflowStrategy.dropHead
)

val actorRef: ActorRef = actorSource.to(Sink.foreach(println)).run()
actorRef !  """{"auth":"APIKEY-123"}"""

val webSocketFlow = Http().webSocketClientFlow(
  WebSocketRequest("wss://socket.polygon.io/stocks")
)

val (upgradeResponse, closed) =
  actorSource
    .viaMat(webSocketFlow)(
      Keep.right
    ) // keep the materialized Future[WebSocketUpgradeResponse]
    .toMat(incoming)(Keep.both) // also keep the Future[Done]
    .run()

So when I am using a ActorRef as my source, I am having a hard time trying to fit this into the graph. I am getting this compile time error:

type mismatch; [error] found : akka.stream.scaladsl.Flow[akka.http.scaladsl.model.ws.Message,akka.http.scaladsl.model.ws.Message,scala.concurrent.Future[akka.http.scaladsl.model.ws.WebSocketUpgradeResponse]] [error] required: akka.stream.Graph[akka.stream.FlowShape[String,?],?] [error]
.viaMat(webSocketFlow)(

Note: I want a Actor as my source, and also as my sink i.e. pass all messages that result from the flow to another actor as a sink.

Can someone explain what am I currently doing wrong with my Actor as a source and trying to add it to my flow/graph?

Update

Here is the code I have now:

def main(args: Array[String]): Unit = {
    implicit val system = ActorSystem()
    import system.dispatcher

    val incoming: Sink[Message, Future[Done]] =
      Sink.foreach[Message] {
        case message: TextMessage.Strict =>
          println(message.text)
        case _ =>
        // ignore other message types
      }

    val actorSource = Source.actorRef[String](
      completionMatcher = { case Done =>
        CompletionStrategy.immediately
      },
      failureMatcher = PartialFunction.empty,
      bufferSize = 100,
      overflowStrategy = OverflowStrategy.dropHead
    )

    val webSocketFlow = Http().webSocketClientFlow(
      WebSocketRequest("wss://socket.polygon.io/stocks")
    )

    // the materialized value is a tuple with
    // upgradeResponse is a Future[WebSocketUpgradeResponse] that
    // completes or fails when the connection succeeds or fails
    // and closed is a Future[Done] with the stream completion from the incoming sink
    val ((sendActor, upgradeResponse), closed) =
      actorSource
        .viaMat(webSocketFlow)(
          Keep.both
        ) // keep the materialized Future[WebSocketUpgradeResponse]
        .toMat(incoming)(Keep.both) // also keep the Future[Done]
        .run()

    // just like a regular http request we can access response status which is available via upgrade.response.status
    // status code 101 (Switching Protocols) indicates that server support WebSockets
    val connected = upgradeResponse.flatMap { upgrade =>
      if (upgrade.response.status == StatusCodes.SwitchingProtocols) {
        Future.successful(Done)
      } else {
        throw new RuntimeException(
          s"Connection failed: ${upgrade.response.status}"
        )
      }
    }

    sendActor ! TextMessage("""{"auth":"APIKEY-123"}""")
    sendActor ! TextMessage("""{"topic":"topic123"}""")

    //in a real application you would not side effect here
    connected.onComplete(println)
    closed.foreach(_ => println("closed"))
  }

I am getting the following compile errors:

[error] The argument types of an anonymous function must be fully known. (SLS 8.5) [error] Expected type was: ? [error]
completionMatcher = { case Done => [error] ^ [error] /home/blank/scala/testing/streamsapp/ws2.scala:57:37: value flatMap is not a member of Any [error] val connected = upgradeResponse.flatMap { upgrade => [error]
^ [error] /home/blank/scala/testing/streamsapp/ws2.scala:67:15: value ! is not a member of Any [error] sendActor ! TextMessage("""{"auth":"APIKEY-123"}""") [error] ^ [error] /home/blank/scala/testing/streamsapp/ws2.scala:68:15: value ! is not a member of Any [error] sendActor ! TextMessage("""{"topic":"topic123"}""") [error] ^ [error] /home/blank/scala/testing/streamsapp/ws2.scala:72:12: value foreach is not a member of Any [error] closed.foreach(_ => println("closed")) [error] ^ [error] 5 errors found

1
Remember that actorSource is a Source so you won't get an ActorRef until you run it, and you get a different ActorRef each time. And probably worth using Message rather than Any in the source.ActorRef declarationTim

1 Answers

1
votes

Your compiler error is arising from your actorSource not outputting Message but String (that error shouldn't be the one you'd get with your code, perhaps you tried changing it to a Source[String, ActorRef]?): since the webSocketFlow only processes Messages, it can only be attached to a source of Message.

So I suggest something along the lines of:

val immediateCompletion: PartialFunction[Any, CompletionStrategy] = {
  case Done => CompletionStrategy.immediately
}

val actorSource = Source.actorRef[Message](
  completionMatcher = immediateCompletion,
  failureMatcher = PartialFunction.empty,
  bufferSize = 100,
  overflowStrategy = OverflowStrategy.dropHead
)

val webSocketFlow = Http().webSocketClientFlow(
  WebSocketRequest("wss://socket.polygon.io/stocks")
)

val ((sendActor, upgradeResponse), closed) =
  actorSource
    .viaMat(webSocketFlow)(Keep.both)  // keep both the actor and the upgradeResponse
    .toMat(incoming)(Keep.both)  // ...and also keep the closed
    .run()

sendActor ! TextMessage("""{"auth":"APIKEY-123"}""")
sendActor ! TextMessage("""{"topic":"topic123"}""")