2
votes

I'm using scala-io in my akka actors, in my case I need to send request and wait for response, in official docs (http://doc.akka.io/docs/akka/snapshot/scala/io-tcp.html) I can see the answer is asynchronous.

How can I wait for response, can I use somehow ? (ask) pattern

class SocketClient(remoteAddress: InetSocketAddress, listener: ActorRef) extends Actor {

  import Tcp._
  import context.system

  IO(Tcp) ! Connect(remoteAddress)

  def receive = {

    case CommandFailed(_: Connect) =>
      listener ! ConnectFailure
      context stop self

    case Connected(remote, local) =>
      listener ! ConnectSuccess

      val connection = sender
      connection ! Register(self)

      context become {

        case data: ByteString =>
          connection ! Write(data)

        case CommandFailed(w: Write) =>
          Logger.error(s"Error during writing")

        case Received(data) =>
          listener ! data

        case Disconnect =>
          connection ! Close

        case _: ConnectionClosed =>
          Logger.error(s"Connection has been closed ${remoteAddress.getAddress}")
          context stop self
      }
  }
}

Can I use something like:

connection ? Write(data)
1

1 Answers

4
votes

Yes, but you should take in account the fact that ask-pattern allows you to receive only first reply from actor.

In your case it's connection, which may reply some additional or even unrelated objects (it depends on back-pressure/acknowledgement mode you choose. For example, if you use Write - you may receive the written (to the socket) object's acknowledge instead of response.

You can avoid it by:

In other words, ask pattern just creates own internal actor per message and make it a sender, so all replies (for this particular message) are going to this micro-actor. When it receives first reply - future (returned by ?) becomes completed - and internal actor destroyed (so other replies will be ignored).

Also, connection automatically replies to registered (by Register message) listener instead of sender - so you should create mediate actor:

class Asker(connection: ActorRef) extends Actor {
   import Tcp._
   connection ! Register(self); 
   def receive = {
        case x => 
          val parent = sender()
          connection ! x
          context become {case x => parent ! x; context.unbecome()} 
   }   
}

trait TcpAskSupport {
   self: Actor => 

   def asker(connection: ActorRef) = 
      context.child(connection.path.name + "Asker")
        .getOrElse(system.actorOf(Props(classOf[Asker], connection), 
           connection.path.name + "Asker"))
}

Usage example:

class Client extends Actor with TcpAskSupport {

  import Tcp._
  import context.system

  IO(Tcp) ! Connect(new InetSocketAddress("61.91.16.168", 80))
  implicit val timeout = Timeout(new FiniteDuration(5, SECONDS))

  def receive = {
    case CommandFailed(_: Connect) =>
      println("connect failed")
      context stop self

    case c @ Connected(remote, local) =>
      println("Connected" + c)
      val connection = sender()
      asker(connection) ? Write(ByteString("GET /\n", "UTF-8"), NoAck) onComplete {
          case x => println("Response" + x)
      } 
    case x => println("[ERROR] Received" + x)    
  }
}