2
votes

Getting started with Akka Streams I want to perform a simple computation. Extending the basic QuickStart https://doc.akka.io/docs/akka/2.5/stream/stream-quickstart.html with a call to a restful web api:

val source: Source[Int, NotUsed] = Source(1 to 100)
source.runForeach(println)

already works nicely to print the numbers. But when trying to create an Actor to perform the HTTP request (is this actually necessary?) according to https://doc.akka.io/docs/akka/2.5.5/scala/stream/stream-integrations.html

  import akka.pattern.ask
  implicit val askTimeout = Timeout(5.seconds)
  val words: Source[String, NotUsed] =
    Source(List("hello", "hi"))

  words
    .mapAsync(parallelism = 5)(elem => (ref ? elem).mapTo[String])
    // continue processing of the replies from the actor
    .map(_.toLowerCase)
    .runWith(Sink.ignore)

I cannot get it to compile as the ? operator is not defined. As ar as I know this one would only be defined inside an actor. I also do not understand yet where exactly inside mapAsync my custom actor needs to be called.

edit

https://blog.colinbreck.com/backoff-and-retry-error-handling-for-akka-streams/ contains at least parts of an example. It looks like it is not mandatory to create an actor i.e.

implicit val system = ActorSystem()
implicit val ec = system.dispatcher
implicit val materializer = ActorMaterializer()


val source = Source(List("232::03::14062::19965186", "232::03::14062::19965189"))
    .map(cellKey => {
      val splits = cellKey.split("::")
      val mcc = splits(0)
      val mnc = splits(1)
      val lac = splits(2)
      val ci = splits(3)
      CellKeySource(cellKey, mcc, mnc, lac, ci)
    })
    .limit(2)
    .mapAsyncUnordered(2)(ck => getResponse(ck.cellKey, ck.mobileCountryCode, ck.mobileNetworkCode, ck.locationArea, ck.cellKey)("<<myToken>>"))

  def getResponse(cellKey: String, mobileCountryCode:String, mobileNetworkCode:String, locationArea:String, cellId:String)(token:String): Future[String] = {
    RestartSource.withBackoff(
      minBackoff = 10.milliseconds,
      maxBackoff = 30.seconds,
      randomFactor = 0.2,
      maxRestarts = 2
    ) { () =>
      val responseFuture: Future[HttpResponse] =
        Http().singleRequest(HttpRequest(uri = s"https://www.googleapis.com/geolocation/v1/geolocate?key=${token}", entity = ByteString(
          // TODO use proper JSON objects
          s"""
             |{
             |  "cellTowers": [
             |    "mobileCountryCode": $mobileCountryCode,
             |    "mobileNetworkCode": $mobileNetworkCode,
             |    "locationAreaCode": $locationArea,
             |    "cellId": $cellId,
             |  ]
             |}
          """.stripMargin)))

      Source.fromFuture(responseFuture)
        .mapAsync(parallelism = 1) {
          case HttpResponse(StatusCodes.OK, _, entity, _) =>
            Unmarshal(entity).to[String]
          case HttpResponse(statusCode, _, _, _) =>
            throw WebRequestException(statusCode.toString() )
        }
    }
      .runWith(Sink.head)
      .recover {
        case _ => throw StreamFailedAfterMaxRetriesException()
      }
  }

val done: Future[Done] = source.runForeach(println)
done.onComplete(_ ⇒ system.terminate())

is already the (partial) answer for the question i.e. how to integrate Akka-streams + akka-http. However, it does not work, i.e. only throws error 400s and never terminates.

2

2 Answers

2
votes
  1. i think you already found an api how to call akka-http client

  2. regarding your first code snippet which doesn't work. i think there happened some misunderstanding of the example itself. you expected the code in the example to work after just copied. but the intension of the doc was to demonstrate just an example/concept, how you can delegate some long running task out of the stream flow and then consuming the result when it's ready. for this was used ask call to akka actor, because call to ask method returns a Future. probably the authors of the doc just omitted the definition of actor. you can try this one example:

    import java.lang.System.exit
    
    import akka.NotUsed
    import akka.actor.{Actor, ActorRef, ActorSystem, Props}
    import akka.pattern.ask
    import akka.stream.ActorMaterializer
    import akka.stream.scaladsl.{Sink, Source}
    import akka.util.Timeout
    
    import scala.concurrent.ExecutionContext.Implicits.global
    import scala.concurrent.duration._
    import scala.language.higherKinds
    
    object App extends scala.App {
    
      implicit val sys: ActorSystem = ActorSystem()
      implicit val mat: ActorMaterializer = ActorMaterializer()
    
      val ref: ActorRef = sys.actorOf(Props[Translator])
    
      implicit val askTimeout: Timeout = Timeout(5.seconds)
      val words: Source[String, NotUsed] = Source(List("hello", "hi"))
    
      words
        .mapAsync(parallelism = 5)(elem => (ref ? elem).mapTo[String])
        .map(_.toLowerCase)
        .runWith(Sink.foreach(println))
        .onComplete(t => {
          println(s"finished: $t")
          exit(1)
        })
    }
    
    class Translator extends Actor {
    
      override def receive: Receive = {
        case msg => sender() ! s"$msg!"
      }
    }
    
0
votes

You must import ask pattern from akka.

import akka.pattern.ask

Edit: OK, sorry, I can see that you have already imported. What is ref in your code? ActorRef?