1
votes

Hi i'm creating a TCP server in Scala with the lib akka. I want to implement a restart of the tcp socket if my client doesn't send any data in a set interval. I tried with idleTimeout and a SupervisionStrategy but i can't catch the TimeoutException. On the client i see the log "Closing connection due to IO error java.io.IOException: Connection reset by peer"..

How i can resolve that? And restart the stream??

object TCPServer {

  def serverLogic(connection: IncomingConnection) (implicit system: ActorSystem): Flow[ByteString, ByteString, NotUsed] = {

    val converter: Flow[ByteString, String, NotUsed] = Flow[ByteString].map { (bytes: ByteString) =>
      val message = bytes.utf8String
      Logging.getLogger(system,this.getClass).debug(s"server received message $message")
      message
    }
    val httpOut: Flow[String, String, NotUsed] = Flow[String].map { string =>
      val answer: String = s"hello"
      answer
    }
    val responder: Flow[String, ByteString, NotUsed] = Flow[String].map { string =>
      val answer: String = s"Server responded with message [$string]"
      ByteString(answer)
    }

    Flow[ByteString]
      .idleTimeout(Duration.apply(1,"minutes"))
      .via(converter)
      .via(httpOut)
      .via(responder)
  }

  def server(address: String, port: Int)(implicit system: ActorSystem) : Unit = {

    val decider: Supervision.Decider = {  e =>
      LoggerFactory.getLogger(this.getClass).error("Failed ", e)
      Supervision.Restart
    }

    val log = Logging.getLogger(system, this)
    import system.dispatcher
    val materializerSettings = ActorMaterializerSettings(system).withSupervisionStrategy(decider)
    implicit val materializer = ActorMaterializer(materializerSettings)(system)

    val connectionHandler: Sink[IncomingConnection, Future[Done]] = Sink.foreach[Tcp.IncomingConnection] { (conn: IncomingConnection) =>
      log.debug(s"incomig connection from ${conn.remoteAddress}")
      conn.handleWith(serverLogic(conn))
    }

    val incomingConnections: Source[IncomingConnection, Future[ServerBinding]] = Tcp().bind(address, port)

    val binding: Future[ServerBinding] = incomingConnections.to(connectionHandler).run()

    binding.onComplete {
      case Success(b) =>
        log.info("Server started, listening on: " + b.localAddress)
      case Failure(e) =>
        log.error(s"Server could not bind to $address:$port: ${e.getMessage}")
        system.terminate()
    }


  }
}
1
"I want to implement a restart of the tcp socket if my client doesn't send any data in a set interval." - how should that work exactly? The server cannot make the client reconnect? It seems like something that the client would have to implement.jrudolph

1 Answers

0
votes

Take a look to the doc. available in the akka stream webbsite: https://doc.akka.io/docs/akka/2.6.0/stream/stream-error.html. With this RestartSource you can apply a policy in order to restart TCP server.