HINT: This answer is based on akka-stream-experimental version 2.0-M2. The API may be slightly different in other versions.
An easy way to close the connection is by using a PushStage:
import akka.stream.stage._
val closeClient = new PushStage[String, String] {
override def onPush(elem: String, ctx: Context[String]) = elem match {
case "goodbye" ⇒
// println("Connection closed")
ctx.finish()
case msg ⇒
ctx.push(msg)
}
}
Every element that is received at the client side or at the server side (and in general every element that goes through a Flow) goes through such a Stage component. In Akka, the full abstraction is called GraphStage, more information can be found in the official documentation.
With a PushStage we can watch concrete incoming elements for their value and than transform the context accordingly. In the example above, once the goodbye message is received we finish the context otherwise we just forward the value through the push method.
Now, we can connect the closeClient component to an arbitrary flow through the transform method:
val connection = Tcp().outgoingConnection(address, port)
val flow = Flow[ByteString]
.via(Framing.delimiter(
ByteString("\n"),
maximumFrameLength = 256,
allowTruncation = true))
.map(_.utf8String)
.transform(() ⇒ closeClient)
.map(_ ⇒ StdIn.readLine("> "))
.map(_ + "\n")
.map(ByteString(_))
connection.join(flow).run()
The flow above receives a ByteString and returns a ByteString, which means it can be connected to connection through the join method. Inside of the flow we first convert the bytes to a string before we send them to closeClient. If the PushStage doesn't finish the stream, the element is forwarded in the stream, where it gets dropped and replaced by some input from stdin, which is then sent back over the wire. In case the stream is finished, all further stream processing steps after the stage component will be dropped - the stream is now closed.