1
votes

Since spark is built on top of Akka, I want to use Akka to send and receive messages between spark clusters.

According to this tutorial, https://github.com/jaceklaskowski/spark-activator/blob/master/src/main/scala/StreamingApp.scala, I can run StreamingApp locally and send messages to the actorStream itself.

Then I tried to attach the sender part to my another spark master and send message from spark master to the remote actor in StreamingApp. The code is as follows

object SenderApp extends Serializable {

    def main(args: Array[String]) {

        val driverPort = 12345
        val driverHost = "xxxx"
        val conf = new SparkConf(false) 
            .setMaster("spark://localhost:8888") // Connecting to my spark master
            .setAppName("Spark Akka Streaming Sender")
            .set("spark.logConf", "true")
            .set("spark.akka.logLifecycleEvents", "true")
        val actorName = "helloer"

        val sc = new SparkContext(conf)

        val actorSystem = SparkEnv.get.actorSystem

        val url = s"akka.tcp://sparkDriver@$driverHost:$driverPort/user/Supervisor0/$actorName"

        val helloer = actorSystem.actorSelection(url)

        helloer ! "Hello"
        helloer ! "from"
        helloer ! "Spark Streaming"
        helloer ! "with"
        helloer ! "Scala"
        helloer ! "and"
        helloer ! "Akka"
    }
}

Then I got messages from StreamingApp saying it encountered DeadLetters. The detailed messages are:

INFO LocalActorRef: Message [akka.remote.transport.AssociationHandle$Disassociated] from Actor[akka://sparkDriver/deadLetters] to Actor[akka://sparkDriver/system/transports/akkaprotocolmanager.tcp0/akkaProtocol-tcp%3A%2F%2FsparkDriver%40111.22.33.444%3A56840-4#-2094758237] was not delivered. [5] dead letters encountered. This logging can be turned off or adjusted with configuration settings 'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'.
1

1 Answers

1
votes

According to this article: http://typesafe.com/activator/template/spark-streaming-scala-akka

I changed the helloer, it works now

val timeout = 100 seconds

val helloer = Await.result(actorSystem.actorSelection(url).resolveOne(timeout), 
                           timeout)