0
votes

My colleague and I have been puzzled with the different behaviour the DistributedPubSubMediator has for subscribing/unsubscribing directly or via a proxy Actor. We put together a test to show the different result below.

From our understanding, the ActorRef.forward should pass in the original sender, hence whether the message is sent directly to the Mediator or via a proxy Actor should not matter. Ie. http://www.scala-lang.org/api/current/index.html#scala.actors.ActorRef.

To work around, we have to extend the DIstributedPubSubMediator class and include the logic DistributedPubSubMediator object already provides. Ideally, we'd prefer to use the object directly and revert our code.

This seems like a bug. Does anyone know the underlying reason for this unusual behaviour? Please help...

[22-Oct-2013] Test is updated based on Roland's answer (Thank you) and added expectMsgType on SubscriberAck and UnsubscribeAck. We now receive the SubscribeAck, but strangely not the UnSubscribeAck. It is not a major issue but we would like to know why.

Another question, if we may ask altogether, is whether it is good practice to Subscribe remote actors to the DistributedPubSubMediator via proxy Actor running in the same ActorSystem?

At the moment we have:

  1. The subscribing App discovers the publishing App (in non-Akka-way) and gets Cluster address.
  2. The remote subscriber uses this address and the known proxy actor's path to send Identity request.
  3. The remote subscriber gets the ActorIdentity response and then Subscribes/Unsubscribes via this (remote) proxy.
  4. On the publisher App Subscribe/Unsubscribe messages are forwarded to the DistributedPubSubMediator and it is used to publish subsequent business messages.

We are not joining the Cluster as per Akka Reactor pubsub chat client example (ie. only using the DistributedPubSubMediator to publish) because we need to handle Failover on the Publisher side.

[5-Nov-2013] Added a test on Send message. It does not seem to work and we haven't figured it out yet.

package star.common.pubsub

import org.scalatest.{BeforeAndAfterAll, FunSuite}

import org.junit.runner.RunWith

import akka.contrib.pattern.DistributedPubSubExtension
import akka.contrib.pattern.DistributedPubSubMediator._
import akka.testkit.TestKit
import akka.actor.{Actor, ActorSystem, ActorRef, Props}
import scala.concurrent.duration._

import com.typesafe.config.ConfigFactory

object MediatorTest {
  val config = ConfigFactory.parseString(s"""
                              akka.actor.provider="akka.cluster.ClusterActorRefProvider"
                              akka.remote.netty.tcp.port=0
                              akka.extensions = ["akka.contrib.pattern.DistributedPubSubExtension"]
                              """)
}

@RunWith(classOf[org.scalatest.junit.JUnitRunner])
class MediatorTest extends TestKit(ActorSystem("test", MediatorTest.config)) with FunSuite {

  val mediator = DistributedPubSubExtension(system).mediator
  val topic = "example"
  val message = "Published Message"
  //  val joinAddress = Cluster(system).selfAddress
  //  Cluster(system).join(joinAddress)

  test("Direct subscribe to mediator") {
    mediator.!(Subscribe(topic, testActor))(testActor)
    expectMsgType[SubscribeAck](5 seconds)

    mediator.!(Publish(topic, message))(testActor)
    expectMsg(2 seconds, message)

    mediator.!(Unsubscribe(topic, testActor))(testActor)
    expectMsgType[UnsubscribeAck](5 seconds)

    mediator ! Publish(topic, message)
    expectNoMsg(2 seconds)
  }


  test("Subscribe to mediator via proxy") {
    class Proxy extends Actor {
      override def receive = {
        case subscribe: Subscribe =>
          mediator forward subscribe

        case unsubscribe: Unsubscribe =>
          mediator forward unsubscribe

        case publish: Publish =>
          mediator.!(publish)
      }
    } 

    val proxy = system.actorOf(Props(new Proxy), "proxy")

    proxy.!(Subscribe(topic,testActor))(testActor)
    expectMsgType[SubscribeAck](2 seconds)

    proxy ! Publish(topic, message)
    expectMsg(5 seconds, message)

    proxy.!(Unsubscribe(topic,testActor))(testActor)
    expectMsgType[UnsubscribeAck](5 seconds)

    proxy ! Publish(topic, message)
    expectNoMsg(5 seconds)
  }

  test("Send message to address") {

    val testActorAddress = testActor.path.toString
    //    val system2 = ActorSystem("test", MediatorTest.config)
    //    Cluster(system2).join(joinAddress)

    mediator.!(Subscribe(topic, testActor))(testActor)
    expectMsgType[SubscribeAck](5 seconds)

    println(testActorAddress) // akka://test/system/testActor1

    mediator.!(Publish(topic, message))(testActor)
    expectMsg(2 seconds, message)

    mediator ! Send(testActorAddress, message, false)

    expectMsg(5 seconds, message)
  }
}
1

1 Answers

1
votes

Two things:

  • whether or not you use forward does not matter much, since you do not have a useful sender in scope in your test procedure (you are not mixing in ImplicitSender); but this is not the problem
  • you are not forwarding the Publish message, which is why it does not publish the message