11
votes

I apologize in advance if this seems at all confusing, as I'm dumping quite a bit here. Basically, I have a small service grabbing some Json, parsing and extracting it to case class(es), then writing it to a database. This service needs to run on a schedule, which is being handled well by an Akka scheduler. My database doesn't like when Slick tries to ask for a new AutoInc id at the same time, so I built in an Await.result to block that from happening. All of this works quite well, but my issue starts here: there are 7 of these services running, so I would like to block each one using a similar Await.result system. Every time I try to send the end time of the request back as a response (at the end of the else block), it gets sent to dead letters instead of to the Distributor. Basically: why does sender ! time go to dead letters and not to Distributor. This is a long question for a simple problem, but that's how development goes...

ClickActor.scala

    import java.text.SimpleDateFormat
    import java.util.Date
    import Message._
    import akka.actor.{Actor, ActorLogging, Props}
    import akka.util.Timeout
    import com.typesafe.config.ConfigFactory
    import net.liftweb.json._
    import spray.client.pipelining._
    import spray.http.{BasicHttpCredentials, HttpRequest, HttpResponse, Uri}
    import akka.pattern.ask
    import scala.concurrent.{Await, Future}
    import scala.concurrent.duration._

case class ClickData(recipient : String, geolocation : Geolocation, tags : Array[String],
                     url : String, timestamp : Double, campaigns : Array[String],
                     `user-variables` : JObject, ip : String,
                     `client-info` : ClientInfo, message : ClickedMessage, event : String)
  case class Geolocation(city : String, region : String, country : String)
  case class ClientInfo(`client-name`: String, `client-os`: String, `user-agent`: String,
                      `device-type`: String, `client-type`: String)
  case class ClickedMessage(headers : ClickHeaders)
    case class ClickHeaders(`message-id` : String)

class ClickActor extends Actor with ActorLogging{

  implicit val formats = DefaultFormats
  implicit val timeout = new Timeout(3 minutes)
  import context.dispatcher

  val con = ConfigFactory.load("connection.conf")
  val countries = ConfigFactory.load("country.conf")
  val regions = ConfigFactory.load("region.conf")

  val df = new SimpleDateFormat("EEE, dd MMM yyyy HH:mm:ss -0000")
  var time = System.currentTimeMillis()
  var begin = new Date(time - (12 hours).toMillis)
  var end = new Date(time)

  val pipeline : HttpRequest => Future[HttpResponse] = (
    addCredentials(BasicHttpCredentials("api", con.getString("mailgun.key")))
      ~> sendReceive
    )

  def get(lastrun : Long): Future[String] = {

    if(lastrun != 0) {
      begin = new Date(lastrun)
      end = new Date(time)
    }

    val uri = Uri(con.getString("mailgun.uri")) withQuery("begin" -> df.format(begin), "end" -> df.format(end),
      "ascending" -> "yes", "limit" -> "100", "pretty" -> "yes", "event" -> "clicked")
    val request = Get(uri)
    val futureResponse = pipeline(request)
    return futureResponse.map(_.entity.asString)
  }

  def receive = {
    case lastrun : Long => {
      val start = System.currentTimeMillis()
      val responseFuture = get(lastrun)
      responseFuture.onSuccess {
        case payload: String => val json = parse(payload)
          //println(pretty(render(json)))
          val elements = (json \\ "items").children
          if (elements.length == 0) {
            log.info("[ClickActor: " + this.hashCode() + "] did not find new events between " +
              begin.toString + " and " + end.toString)
            sender ! time
            context.stop(self)
          }
          else {
            for (item <- elements) {
              val data = item.extract[ClickData]
              var tags = ""
              if (data.tags.length != 0) {
                for (tag <- data.tags)
                  tags += (tag + ", ")
              }
              var campaigns = ""
              if (data.campaigns.length != 0) {
                for (campaign <- data.campaigns)
                  campaigns += (campaign + ", ")
              }
              val timestamp = (data.timestamp * 1000).toLong
              val msg = new ClickMessage(
                data.recipient, data.geolocation.city,
                regions.getString(data.geolocation.country + "." + data.geolocation.region),
                countries.getString(data.geolocation.country), tags, data.url, timestamp,
                campaigns, data.ip, data.`client-info`.`client-name`,
                data.`client-info`.`client-os`, data.`client-info`.`user-agent`,
                data.`client-info`.`device-type`, data.`client-info`.`client-type`,
                data.message.headers.`message-id`, data.event, compactRender(item))
              val csqla = context.actorOf(Props[ClickSQLActor])
              val future = csqla.ask(msg)
              val result = Await.result(future, timeout.duration).asInstanceOf[Int]
              if (result == 1) {
                log.error("[ClickSQLActor: " + csqla.hashCode() + "] shutting down due to lack of system environment variables")
                context.stop(csqla)
              }
              else if(result == 0) {
                log.info("[ClickSQLActor: " + csqla.hashCode() + "] successfully wrote to the DB")
              }
            }
            sender ! time
            log.info("[ClickActor: " + this.hashCode() + "] processed |" + elements.length + "| new events in " +
              (System.currentTimeMillis() - start) + " ms")
          }
      }
    }
  }
}

Distributor.scala

import akka.actor.{Props, ActorSystem}
import akka.event.Logging
import akka.util.Timeout
import akka.pattern.ask
import scala.concurrent.duration._
import scala.concurrent.Await

class Distributor {

  implicit val timeout = new Timeout(10 minutes)
  var lastClick : Long = 0

  def distribute(system : ActorSystem) = {
    val log = Logging(system, getClass)

    val clickFuture = (system.actorOf(Props[ClickActor]) ? lastClick)
    lastClick = Await.result(clickFuture, timeout.duration).asInstanceOf[Long]
    log.info(lastClick.toString)

    //repeat process with other events (open, unsub, etc)
  }
}
1

1 Answers

20
votes

The reason is because the value of 'sender' (which is a method that retrieves the value) is no longer valid after leaving the receive block, yet the future that is being used in the above example will still be running and by the time that it finishes the actor will have left the receive block and bang; an invalid sender results in the message going to the dead letter queue.

The fix is to either not use a future, or when combining futures, actors and sender then capture the value of sender before you trigger the future.

val s = sender

val responseFuture = get(lastrun)
    responseFuture.onSuccess {    
    ....
    s ! time
}