5
votes

I'm prototyping a network server using Akka Streams that will listen on a port, accept incoming connections, and continuously read data off each connection. Each connected client will only send data, and will not expect to receive anything useful from the server.

Conceptually, I figured it would be fitting to model the incoming events as one single stream that only incidentally happens to be delivered via multiple TCP connections. Thus, assuming that I have a case class Msg(msg: String) that represents each data message, what I want is to represent the entirety of incoming data as a Source[Msg, _]. This makes a lot of sense for my use case, because I can very simply connect flows & sinks to this source.

Here's the code I wrote to implement my idea:

import akka.actor.ActorSystem
import akka.stream.ActorMaterializer
import akka.stream.SourceShape
import akka.stream.scaladsl._
import akka.util.ByteString
import akka.NotUsed
import scala.concurrent.{ Await, Future }
import scala.concurrent.duration._

case class Msg(msg: String)

object tcp {
  val N = 2
  def main(argv: Array[String]) {
    implicit val system = ActorSystem()
    implicit val materializer = ActorMaterializer()
    val connections = Tcp().bind("0.0.0.0", 65432)
    val delim = Framing.delimiter(
      ByteString("\n"),
      maximumFrameLength = 256, allowTruncation = true
    )
    val parser = Flow[ByteString].via(delim).map(_.utf8String).map(Msg(_))
    val messages: Source[Msg, Future[Tcp.ServerBinding]] =
      connections.flatMapMerge(N, {
        connection =>
          println(s"client connected: ${connection.remoteAddress}")
          Source.fromGraph(GraphDSL.create() { implicit builder =>
            import GraphDSL.Implicits._
            val F = builder.add(connection.flow.via(parser))
            val nothing = builder.add(Source.tick(
              initialDelay = 1.second,
              interval = 1.second,
              tick = ByteString.empty
            ))
            F.in <~ nothing.out
            SourceShape(F.out)
          })
      })
    import scala.concurrent.ExecutionContext.Implicits.global
    Await.ready(for {
      _ <- messages.runWith(Sink.foreach {
        msg => println(s"${System.currentTimeMillis} $msg")
      })
      _ <- system.terminate()
    } yield (), Duration.Inf)
  }
}

This code works as expected, however, note the val N = 2, which is passed into the flatMapMerge call that ultimately combines the incoming data streams into one. In practice this means that I can only read from that many streams at a time.

I don't know how many connections will be made to this server at any given time. Ideally I would want to support as many as possible, but hardcoding an upper bound doesn't seem like the right thing to do.

My question, at long last, is: How can I obtain or create a flatMapMerge stage that can read from more than a fixed number of connections at one time?

1
You have a machine with infinite TCP ports & memory?!Viktor Klang
Nope, the infinite memory machine is backordered, I won't have it until christmas. My problem with the above code is that it latches onto N connections and keeps reading from them until it can't, while all other connections are left to fill up TCP receive buffers. So I either need to stop accepting new connections, or have a rotating cast of N open connections read from, but not always the same ones. Does this make any sense to you?Max A.
Isn't that exactly how it's supposed to work? Think of it [merge] like a Selector, it can only deal with a finite number of File Descriptors at a time.Viktor Klang
Let's say that I have four connections: A, B, C, and D. I set N=2, so I read from A & B, while nothing at all happens to C & D. This is dumb because I should have either never accepted C&D, or I should have some way of cycle through all my connections N at a time, read a little from each, then move on to others. I don't see how this can be one with Akka streams, but I'm still new.Max A.
Ah, so you want to cap the accepting of new connections to N?Viktor Klang

1 Answers

0
votes

As indicated by Viktor Klang's comments I don't think this is possible in 1 stream. However, I think it would be possible to create a stream that can receive messages after materialization and use that as a "sink" for messages coming from the TCP connections.

First create the "sink" stream:

val sinkRef = 
  Source
    .actorRef[Msg](Int.MaxValue, fail)
    .to(Sink foreach {m =>  println(s"${System.currentTimeMillis} $m")})
    .run()

This sinkRef can be used by each Connection to receive the messages:

connections foreach { conn =>
  Source
    .empty[ByteString]
    .via(conn.flow)
    .via(parser)
    .runForeach(msg => sinkRef ! msg)
}