2
votes

I'm currently trying to implement a clustered play + akka implementation with an auto discovery service. However, I seem to be running into issues with the Guice DI loader that's included with play. The excerpt from their documentation states:

https://www.playframework.com/documentation/2.5.x/ScalaAkka#Integrating-with-Akka

While we recommend you use the built in actor system, as it sets up everything such as the correct classloader, lifecycle hooks, etc, there is nothing stopping you from using your own actor system. It is important however to ensure you do the following:

Register a stop hook to shut the actor system down when Play shuts down Pass in the correct classloader from the Play Environment otherwise Akka won’t be able to find your applications classes

Ensure that either you change the location that Play reads it’s akka configuration from using play.akka.config, or that you don’t read your akka configuration from the default akka config, as this will cause problems such as when the systems try to bind to the same remote ports

I have done the above configuration that they recommend however I can't seem to get around play still binding it's internal ActorSystemProvider from the BuiltInModule:

class BuiltinModule extends Module {
def bindings(env: Environment, configuration: Configuration): Seq[Binding[_]] = 

    {
        def dynamicBindings(factories: ((Environment, Configuration) => Seq[Binding[_]])*) = {
          factories.flatMap(_(env, configuration))
        }

        Seq(
          bind[Environment] to env,
          bind[ConfigurationProvider].to(new ConfigurationProvider(configuration)),
          bind[Configuration].toProvider[ConfigurationProvider],
          bind[HttpConfiguration].toProvider[HttpConfiguration.HttpConfigurationProvider],

          // Application lifecycle, bound both to the interface, and its implementation, so that Application can access it
          // to shut it down.
          bind[DefaultApplicationLifecycle].toSelf,
          bind[ApplicationLifecycle].to(bind[DefaultApplicationLifecycle]),

          bind[Application].to[DefaultApplication],
          bind[play.Application].to[play.DefaultApplication],

          bind[Router].toProvider[RoutesProvider],
          bind[play.routing.Router].to[JavaRouterAdapter],
          bind[ActorSystem].toProvider[ActorSystemProvider],
          bind[Materializer].toProvider[MaterializerProvider],
          bind[ExecutionContextExecutor].toProvider[ExecutionContextProvider],
          bind[ExecutionContext].to[ExecutionContextExecutor],
          bind[Executor].to[ExecutionContextExecutor],
          bind[HttpExecutionContext].toSelf,

          bind[CryptoConfig].toProvider[CryptoConfigParser],
          bind[CookieSigner].toProvider[CookieSignerProvider],
          bind[CSRFTokenSigner].toProvider[CSRFTokenSignerProvider],
          bind[AESCrypter].toProvider[AESCrypterProvider],
          bind[play.api.libs.Crypto].toSelf,
          bind[TemporaryFileCreator].to[DefaultTemporaryFileCreator]
        ) ++ dynamicBindings(
            HttpErrorHandler.bindingsFromConfiguration,
            HttpFilters.bindingsFromConfiguration,
            HttpRequestHandler.bindingsFromConfiguration,
            ActionCreator.bindingsFromConfiguration
          )
      }
    }

I have tried creating my own GuiceApplicationBuilder in order to bypass this however, now it just moves the duplicate binding exception to come from the BuiltInModule instead.

Here's what I'm trying:

AkkaConfigModule:

package module.akka

import com.google.inject.{AbstractModule, Inject, Provider, Singleton}
import com.typesafe.config.Config
import module.akka.AkkaConfigModule.AkkaConfigProvider
import net.codingwell.scalaguice.ScalaModule
import play.api.Application

/**
  * Created by dmcquill on 8/15/16.
  */
object AkkaConfigModule {
    @Singleton
    class AkkaConfigProvider @Inject() (application: Application) extends Provider[Config] {
        override def get() = {
            val classLoader = application.classloader
            NodeConfigurator.loadConfig(classLoader)
        }
    }
}

/**
  * Binds the application configuration to the [[Config]] interface.
  *
  * The config is bound as an eager singleton so that errors in the config are detected
  * as early as possible.
  */
class AkkaConfigModule extends AbstractModule with ScalaModule {

    override def configure() {
        bind[Config].toProvider[AkkaConfigProvider].asEagerSingleton()
    }

}

ActorSystemModule:

package module.akka


import actor.cluster.ClusterMonitor
import akka.actor.ActorSystem
import com.google.inject._
import com.typesafe.config.Config
import net.codingwell.scalaguice.ScalaModule
import play.api.inject.ApplicationLifecycle

import scala.collection.JavaConversions._

/**
  * Created by dmcquill on 7/27/16.
  */
object ActorSystemModule {
    @Singleton
    class ActorSystemProvider @Inject() (val lifecycle: ApplicationLifecycle, val config: Config, val injector: Injector) extends Provider[ActorSystem] {
        override def get() = {
            val system = ActorSystem(config.getString(NodeConfigurator.CLUSTER_NAME_PROP), config.getConfig("fitnessApp"))

            // add the GuiceAkkaExtension to the system, and initialize it with the Guice injector
            GuiceAkkaExtension(system).initialize(injector)

            system.log.info("Configured seed nodes: " + config.getStringList("fitnessApp.akka.cluster.seed-nodes").mkString(", "))
            system.actorOf(GuiceAkkaExtension(system).props(ClusterMonitor.name))

            lifecycle.addStopHook { () =>
                system.terminate()
            }

            system
        }
    }
}

/**
  * A module providing an Akka ActorSystem.
  */
class ActorSystemModule extends AbstractModule with ScalaModule {
    import module.akka.ActorSystemModule.ActorSystemProvider

    override def configure() {
        bind[ActorSystem].toProvider[ActorSystemProvider].asEagerSingleton()
    }
}

Application Loader:

class CustomApplicationLoader extends GuiceApplicationLoader {

    override def builder(context: ApplicationLoader.Context): GuiceApplicationBuilder = {
        initialBuilder
            .overrides(overrides(context): _*)
            .bindings(new AkkaConfigModule, new ActorSystemModule)
    }

}

The main thing I need to accomplish is to configure the ActorSystem so that I can load the seed nodes of the Akka cluster programmatically.

Is the above approach the right approach or is there a better way to accomplish this? If this is the right approach is there something I'm fundamentally not understanding with the DI setup for play/guice?

Update

For this architecture, play+akka are located on the same node.

2

2 Answers

4
votes

In the end I ended up trying to do something a bit more complicated than necessary. Instead of doing the above flow I simply extended the initial configuration programmatically so that I could retrieve the necessary networking information programmatically.

The end result essentially consisted of a few classes:

NodeConfigurator: This class contains relevant utility methods used to retrieve the properties from the application.conf and then create a config programmatically to be used in conjunction with a kubernetes discovery service.

object NodeConfigurator {

    /**
      * This method given a class loader will return the configuration object for an ActorSystem
      * in a clustered environment
      *
      * @param classLoader the configured classloader of the application
      * @return Config
      */
    def loadConfig(classLoader: ClassLoader) = {
        val config = ConfigFactory.load(classLoader)

        val clusterName = config.getString(CLUSTER_NAME_PROP)
        val seedPort = config.getString(SEED_PORT_CONF_PROP)

        val host = if (config.getString(HOST_CONF_PROP) equals "eth0-address-or-localhost") {
            getLocalHostAddress.getOrElse(DEFAULT_HOST_ADDRESS)
        } else {
            config.getString(HOST_CONF_PROP)
        }

        ConfigFactory.parseString(formatSeedNodesConfig(clusterName, getSeedNodes(config), seedPort, host))
            .withValue(HOST_CONF_PROP, ConfigValueFactory.fromAnyRef(host))
            .withValue("fitnessApp.akka.remote.netty.tcp.hostname", ConfigValueFactory.fromAnyRef(host))
            .withFallback(config)
            .resolve()
    }

    /**
      * Get the local ip address which defaults to localhost if not
      * found on the eth0 adapter
      *
      * @return Option[String]
      */
    def getLocalHostAddress:  Option[String] = {
        import java.net.NetworkInterface

        import scala.collection.JavaConversions._

        NetworkInterface.getNetworkInterfaces
            .find(_.getName equals "eth0")
            .flatMap { interface =>
                interface.getInetAddresses.find(_.isSiteLocalAddress).map(_.getHostAddress)
            }
    }

    /**
      * Retrieves a set of seed nodes that are currently running in our cluster
      *
      * @param config akka configuration object
      * @return Array[String]
      */
    def getSeedNodes(config: Config) = {
        if(config.hasPath(SEED_NODES_CONF_PROP)) {
            config.getString(SEED_NODES_CONF_PROP).split(",").map(_.trim)
        } else {
            Array.empty[String]
        }
    }

    /**
      * formats the seed node addresses in the proper format
      *
      * @param clusterName name of akka cluster
      * @param seedNodeAddresses listing of current seed nodes
      * @param seedNodePort configured seed node port
      * @param defaultSeedNodeAddress default seed node address
      * @return
      */
    def formatSeedNodesConfig(clusterName: String, seedNodeAddresses: Array[String], seedNodePort: String, defaultSeedNodeAddress: String) = {
        if(seedNodeAddresses.isEmpty) {
            s"""fitnessApp.akka.cluster.seed-nodes = [ "akka.tcp://$clusterName@$defaultSeedNodeAddress:$seedNodePort" ]"""
        } else {
            seedNodeAddresses.map { address =>
                s"""fitnessApp.akka.cluster.seed-nodes += "akka.tcp://$clusterName@$address:$seedNodePort""""
            }.mkString("\n")
        }
    }

    val CLUSTER_NAME_PROP = "fitnessAkka.cluster-name"
    val HOST_CONF_PROP = "fitnessAkka.host"
    val PORT_CONF_PROP = "fitnessAkka.port"
    val SEED_NODES_CONF_PROP = "fitnessAkka.seed-nodes"
    val SEED_PORT_CONF_PROP = "fitnessAkka.seed-port"

    private val DEFAULT_HOST_ADDRESS = "127.0.0.1"
}

CustomApplicationLoader: Simply uses the overridable application loader of play to take in the produced config from from NodeConfigurator and then extend the initialConfiguration with it.

class CustomApplicationLoader extends GuiceApplicationLoader {

    override def builder(context: ApplicationLoader.Context): GuiceApplicationBuilder = {
        val classLoader = context.environment.classLoader
        val configuration = Configuration(NodeConfigurator.loadConfig(classLoader))

        initialBuilder
                .in(context.environment)
                .loadConfig(context.initialConfiguration ++ configuration)
                .overrides(overrides(context): _*)
    }

}

AkkaActorModule: Provides a dependency injectable actor ref for use with an API to display the cluster members.

class AkkaActorModule extends AbstractModule with AkkaGuiceSupport {
    def configure = {
        bindActor[ClusterMonitor]("cluster-monitor")
    }
}

ClusterMonitor: This is an actor that is simply listening to cluster events and additionally receives messages to produce the current cluster state.

class ClusterMonitor @Inject() extends Actor with ActorLogging {
    import actor.cluster.ClusterMonitor.GetClusterState

    val cluster = Cluster(context.system)
    private var nodes = Set.empty[Address]

    override def preStart(): Unit = {
        cluster.subscribe(self, initialStateMode = InitialStateAsEvents, classOf[MemberEvent], classOf[UnreachableMember])
    }

    override def postStop(): Unit = cluster.unsubscribe(self)

    override def receive = {
        case MemberUp(member) => {
            nodes += member.address
            log.info(s"Cluster member up: ${member.address}")
        }
        case UnreachableMember(member) => log.warning(s"Cluster member unreachable: ${member.address}")
        case MemberRemoved(member, previousStatus) => {
            nodes -= member.address
            log.info(s"Cluster member removed: ${member.address}")
        }
        case MemberExited(member) => log.info(s"Cluster member exited: ${member.address}")
        case GetClusterState => sender() ! nodes
        case _: MemberEvent =>
    }

}

object ClusterMonitor {
    case class GetClusterState()
}

Application: Simply a test controller to output a list of nodes that have joined the cluster

class Application @Inject() (@Named("cluster-monitor") clusterMonitorRef: ActorRef) extends Controller {

    implicit val addressWrites = new Writes[Address] {
        def writes(address: Address) = Json.obj(
            "host" -> address.host,
            "port" -> address.port,
            "protocol" -> address.protocol,
            "system" -> address.system
        )
    }

    implicit val timeout = Timeout(5, TimeUnit.SECONDS)

    def listClusterNodes = Action.async {
        (clusterMonitorRef ? GetClusterState).mapTo[Set[Address]].map { addresses =>
            Ok(Json.toJson(addresses))
        }
    }

}

The result of the above controller produces output similar to the below:

$ http GET 192.168.99.100:30760/cluster/nodes

HTTP/1.1 200 OK
Content-Length: 235
Content-Type: application/json
Date: Thu, 18 Aug 2016 02:50:30 GMT

[
    {
        "host": "172.17.0.3", 
        "port": 2551, 
        "protocol": "akka.tcp", 
        "system": "fitnessApp"
    }, 
    {
        "host": "172.17.0.4", 
        "port": 2551, 
        "protocol": "akka.tcp", 
        "system": "fitnessApp"
    }, 
    {
        "host": "172.17.0.5", 
        "port": 2551, 
        "protocol": "akka.tcp", 
        "system": "fitnessApp"
    }
]
0
votes

There is a nice example from lightbend http://www.lightbend.com/activator/template/play-akka-cluster-sample you can download the sample example and reused this.