4
votes

I'm aware that as of Akka 2.4.16, there is no "remote" implementation of Reactive Streams. The specification focuses on a stream running on a single JVM.

However, considering the use case to involve another JVM for some processing while maintaining back pressure. The idea is to have a main application that provides a user interface running a stream. For instance, this stream has a stage performing some heavy computations that should run on a different machine. I'm interested in ways to run streams in a distributed way - I came across some articles pointing out some ideas:

What other alternatives are there? Are there any significant downsides to the above? Any special characteristics to consider?

Update: This question is not limited to a single use case. I'm generally interested in all possible ways to work with streams in a distributed environment. That means, e.g. it can involve only one stream that integrates actors with .mapAsync or e.g. there could be two separate streams on two machines communicating via Akka HTTP. The only requirement is that back pressure has to be enforced among all components.

2
I think that you are mis understanding something. So... how can you have a inter-jvm stream ? Well... by having components which actually reside in different jvm's. Now you need to understand that the those components in this particular case will be Actors. So... you just need to create a FlowShape/Sink/Source with some remote Actor and Artery will take care of the messaging.sarveshseri
I totally agree with your comment - according to the blog post, Artery maintains the back pressure when these two actors communicate with each other. My question rather aims to understand whether e.g. using .mapAsync for integrating remote actors in a stream has the same result: having a stream that processes something on a different machine. More generally asked: What are the ways to implement streams crossing JVM boundaries?Toaditoad

2 Answers

1
votes

Well... It seems that I will have to add an example for that. One thing that you need to understand is that BackPressure is handled by the AsyncBoundries in GraphStages. It really has nothing to do with a component existing some where else. Also... It is not dependent on Artery which is nothing but the new remote transport.

Here is an example of probably the simplest cross-jvm stream,

First Application,

import akka.actor.{Actor, ActorLogging, ActorSystem, Props}
import akka.actor.Actor.Receive
import com.typesafe.config.{Config, ConfigFactory}

class MyActor extends Actor with ActorLogging {
  override def receive: Receive = {
    case msg @ _ => {
      log.info(msg.toString)
      sender() ! msg
    }
  }
}

object MyApplication extends App {

  val config = ConfigFactory.parseString(
    """
      |akka{
      |  actor {
      |    provider = remote
      |  }
      |  remote {
      |    enabled-transports = ["akka.remote.netty.tcp"]
      |    untrusted-mode = off
      |    netty.tcp {
      |      hostname="127.0.0.1"
      |      port=18000
      |    }
      |  }
      |}
    """.stripMargin
  )

  val actorSystem = ActorSystem("my-actor-system", config)

  var myActor = actorSystem.actorOf(Props(classOf[MyActor]), "my-actor")

}

And Second application... actually "runs" the stream which uses the actor in first application.

import akka.actor.{ActorPath, ActorSystem}
import akka.stream.ActorMaterializer
import akka.stream.scaladsl.{Flow, Keep, Sink, Source}
import akka.pattern.ask
import com.typesafe.config.ConfigFactory

import scala.language.postfixOps
import scala.concurrent.duration._

object YourApplication extends App {

  val config = ConfigFactory.parseString(
    """
      |akka{
      |  actor {
      |    provider = remote
      |  }
      |  remote {
      |    enabled-transports = ["akka.remote.netty.tcp"]
      |    untrusted-mode = off
      |    netty.tcp {
      |      hostname="127.0.0.1"
      |      port=19000
      |    }
      |  }
      |}
    """.stripMargin
  )

  val actorSystem = ActorSystem("your-actor-system", config)

  import actorSystem.dispatcher

  val logger = actorSystem.log

  implicit val implicitActorSystem = actorSystem
  implicit val actorMaterializer = ActorMaterializer()

  val myActorPath = ActorPath.fromString("akka.tcp://[email protected]:18000/user/my-actor")

  val myActorSelection = actorSystem.actorSelection(myActorPath)

  val source = Source(1 to 10)

  // here this "mapAsync" wraps the given T => Future[T] function in a GraphStage
  val myRemoteComponent = Flow[Int].mapAsync(2)(i => {
    myActorSelection.resolveOne(1 seconds).flatMap(myActorRef => 
      (myActorRef.ask(i)(1 seconds)).map(x => x.asInstanceOf[Int])
    )
  })

  val sink = Sink.foreach[Int](i => logger.info(i.toString))

  val stream = source.via(myRemoteComponent).toMat(sink)(Keep.right)

  val streamRun = stream.run()

}
0
votes

In Akka 2.5.10 and above, you can now use StreamRefs for this. StreamRefs are designed for this use-case, so they are particularly suitable for remote work queues, because they back pressure until the stream that is locally attached to them can accept more work.