1
votes

I have using akka http streams to implement WebSocket server. However, Client sends a lot of data every minute. So the connection is kept alive between client and server.

While implementing web socket handler, I have used flow and sink. And using Sink.seq. A sink is something that keeps on collecting incoming elements until upstream terminates.

How can I avoid it?

    implicit val system = ActorSystem("DecoderSystem")
    implicit val materializer = ActorMaterializer()
    val streamedMsgFuture: Future[Seq[ByteString]] = streamedMsg.runWith(Sink.seq)
    streamedMsgFuture.onComplete { completedStream =>
      var completeBytestring = new ByteStringBuilder()
      //I'm sure there's a better way to do this.. but hey, it's O(n)
      completedStream.foreach { x => 
        x.foreach { y => 
          completeBytestring ++= y  
        }
      }
1

1 Answers

1
votes

How to solve this problem entirely depends on how you want to use the incoming data. This is not clear from the question, so I'm going to assume that you want to perform some action for each incoming ByteString. You can always complicate this logic by introducing some batching stage - akka streams can help you with that.

You can perform an action on any separate incoming ByteString by using map/mapAsync combinators (depending if your action is synchronous/asynchronous). Example below:

def action(msg: ByteString): SomeResult = ??? // do something
def asyncAction(msg: ByteString): Future[SomeResult] = ??? // do something

def handleResult(result: SomeResult): Unit = ??? // do something

streamedMsg.map(action).runForeach(handleResult)
streamedMsg.mapAsync(5)(asyncAction).runForeach(handleResult)