You could implement your own mailbox, this approach will not affect your actor implementation. See this answer for solution with changes in actor implementation instead of custom mailbox implementation.
Implementation of mailbox that drops old messages on enqueue
:
package akka.actor.test
import akka.actor.{ ActorRef, ActorSystem }
import com.typesafe.config.Config
import akka.dispatch.{Envelope, MessageQueue}
class SingleMessageMailbox extends akka.dispatch.MailboxType {
def this(settings: ActorSystem.Settings, config: Config) = this()
final override def create(owner: Option[ActorRef], system: Option[ActorSystem]): MessageQueue =
new MessageQueue {
val message = new java.util.concurrent.atomic.AtomicReference[Envelope]
final def cleanUp(owner: ActorRef, deadLetters: MessageQueue): Unit =
Option(message.get) foreach {deadLetters.enqueue(owner, _)}
def enqueue(receiver: ActorRef, handle: Envelope): Unit =
for {e <- Option(message.getAndSet(handle))}
receiver.asInstanceOf[InternalActorRef].
provider.deadLetters.
tell(DeadLetter(e.message, e.sender, receiver), e.sender)
def dequeue(): Envelope = message.getAndSet(null)
def numberOfMessages: Int = Option(message.get).size
def hasMessages: Boolean = message.get != null
}
}
Note that I have to add this class into package akka.actor
to send old message to dead letters using InternalActorRef
like implemented for BoundedQueueBasedMessageQueue
.
If you want to just skip old messages you could implement enqueue
like this:
def enqueue(receiver: ActorRef, handle: Envelope): Unit = message.set(handle)
Usage:
object Test extends App {
import akka.actor._
import com.typesafe.config.ConfigFactory
val actorSystem: ActorSystem =
ActorSystem("default", ConfigFactory.parseString(
"""
akka.daemonic=on
myMailbox.mailbox-type = "akka.actor.test.SingleMessageMailbox"
"""))
class EchoActor extends Actor {
def receive = {
case m => println(m); Thread.sleep(500)
}
}
val actor = actorSystem.actorOf(Props[EchoActor].withMailbox("myMailbox"))
for {i <- 1 to 10} {
actor ! i
Thread.sleep(100)
}
Thread.sleep(1000)
}
Test:
$ sbt run
1
[INFO] <dead letters log>
[INFO] <dead letters log>
[INFO] <dead letters log>
5
[INFO] <dead letters log>
[INFO] <dead letters log>
[INFO] <dead letters log>
[INFO] <dead letters log>
10
See also akka/Mailboxes.