Hi i'm creating a TCP server in Scala with the lib akka. I want to implement a restart of the tcp socket if my client doesn't send any data in a set interval. I tried with idleTimeout and a SupervisionStrategy but i can't catch the TimeoutException. On the client i see the log "Closing connection due to IO error java.io.IOException: Connection reset by peer"..
How i can resolve that? And restart the stream??
object TCPServer {
def serverLogic(connection: IncomingConnection) (implicit system: ActorSystem): Flow[ByteString, ByteString, NotUsed] = {
val converter: Flow[ByteString, String, NotUsed] = Flow[ByteString].map { (bytes: ByteString) =>
val message = bytes.utf8String
Logging.getLogger(system,this.getClass).debug(s"server received message $message")
message
}
val httpOut: Flow[String, String, NotUsed] = Flow[String].map { string =>
val answer: String = s"hello"
answer
}
val responder: Flow[String, ByteString, NotUsed] = Flow[String].map { string =>
val answer: String = s"Server responded with message [$string]"
ByteString(answer)
}
Flow[ByteString]
.idleTimeout(Duration.apply(1,"minutes"))
.via(converter)
.via(httpOut)
.via(responder)
}
def server(address: String, port: Int)(implicit system: ActorSystem) : Unit = {
val decider: Supervision.Decider = { e =>
LoggerFactory.getLogger(this.getClass).error("Failed ", e)
Supervision.Restart
}
val log = Logging.getLogger(system, this)
import system.dispatcher
val materializerSettings = ActorMaterializerSettings(system).withSupervisionStrategy(decider)
implicit val materializer = ActorMaterializer(materializerSettings)(system)
val connectionHandler: Sink[IncomingConnection, Future[Done]] = Sink.foreach[Tcp.IncomingConnection] { (conn: IncomingConnection) =>
log.debug(s"incomig connection from ${conn.remoteAddress}")
conn.handleWith(serverLogic(conn))
}
val incomingConnections: Source[IncomingConnection, Future[ServerBinding]] = Tcp().bind(address, port)
val binding: Future[ServerBinding] = incomingConnections.to(connectionHandler).run()
binding.onComplete {
case Success(b) =>
log.info("Server started, listening on: " + b.localAddress)
case Failure(e) =>
log.error(s"Server could not bind to $address:$port: ${e.getMessage}")
system.terminate()
}
}
}