0
votes

I have a requirement where a client calls a post REST endpoint created via akka http. As soon as the request is in the post method, I need to pass the post object to the stream (consisting of source, several flows and sink, etc) and get back the response from the sink so that I can return the response back to the client.

I have been going through some articles and have seen the below code but have a concern that I don't want to materialize the stream for every request. I only want to materialize a stream and keep on passing the elements to that stream.

Below is the high level of what I saw:

val route: Route =
  path("dummy path") { p =>
    get {
      (extract(_.request) & extractMaterializer) { (req, mat) ⇒
        **Source.single(req).runWith(sink)(mat)**

        complete {
          s"<h1>Say hello to akka-http. p=$p</h1>"
        }
      }
    }
  }

I was thinking of creating an actor and passing the object to that actor. I can create a source from Source.actorRef and connect several flows with this source. But I am not sure, how to get back the response from the sink. Something like:

    val actor: ActorRef = some actor

    Source.actorRef(actor).via(flows).to(Sink).run() --> materialized stream

    val route: akka.http.scaladsl.server.Route =
      path("post" / Segment) { p =>
        post {

          (extract(_.request) & extractMaterializer) { (req, mat) ⇒
           response = actor.ask(message) --> get back the response

            complete {
              response
            }
          }
        }
      }

Or, is there anything else that I can incorporate in my use-case.

1
What is the output of your processing? Do you want to start streaming out the result while the data is still coming in?jrudolph
Hi jrudolph, Request is the post http request and response would be an object (normal pojo) marshalled to json response returned back to the user through the same http call. I am now using akka-http low level DSL for my scenario (where I want to materialize my stream only once) and it working as expected. Though, sometimes when client app makes multiple http calls, sometimes my stream get's hanged and timeout and no response is returned . It is because during timeout HttpEntity.Default is returned as compared to Strict. I tried using .toStict() but still problem persist? M I doing wrong?Kiras
Hi jrudolph, regarding your question: Do you want to start streaming out the result while the data is still coming in -> I believe no. Reason is once I get the http request then only I want to process this further in flows. Once the processing is done then stream out the response back.Kiras

1 Answers

0
votes

I guess what you want is to make the processing of request flow through a stream [materialized only once] and send the response back to the user from the stream. May be a queue source and an Actor in between can do the job

import java.util.concurrent.TimeUnit

import akka.actor.{Actor, ActorRef, ActorSystem, Props}
import akka.http.scaladsl.Http
import akka.http.scaladsl.server.Directives.{
  get,
  onSuccess,
  pathEnd,
  pathPrefix
}
import akka.pattern.ask
import akka.stream.scaladsl.{Keep, Sink, Source, SourceQueueWithComplete}
import akka.stream.{ActorMaterializer, OverflowStrategy, QueueOfferResult}
import akka.util.Timeout
import akka.http.scaladsl.server.directives.RouteDirectives.complete
import com.typesafe.config.ConfigFactory

import scala.concurrent.ExecutionContext

object TestApp2 extends App {

  implicit val actorSystem = ActorSystem("test-system")
  implicit val mat = ActorMaterializer()
  implicit val ec = mat.executionContext

  val streamSource = Source
    .queue[(Message, ActorRef)](100, OverflowStrategy.dropNew)
    .map { p =>
      //do anything here
      println("I am processing request")
      ("It works", p._2)
    }
    .toMat(Sink.foreach { resp =>
      resp._2 ! resp._1
    })(Keep.left)
    .run()
  implicit val timeout = Timeout(
    10000,
    TimeUnit.MILLISECONDS
  )

  val internalActor =
    actorSystem.actorOf(Props(new InternalActor(streamSource)))
  Http(actorSystem)
    .bindAndHandle(
      getRoutes(internalActor),
      "0.0.0.0",
      8080
    )

  def getRoutes(
      internalActor: ActorRef
  )(implicit mat: ActorMaterializer, ec: ExecutionContext, timeout: Timeout) = {
    pathPrefix("healthcheck") {
      get {
        pathEnd {
          val responseReturned = internalActor ? Message()
          onSuccess(responseReturned) {
            case response: String =>
              complete(response)
            case _ => complete("error")
          }
        }
      }
    }
  }
}

case class Message()

class InternalActor(streamSource: SourceQueueWithComplete[(Message, ActorRef)])(
    implicit ec: ExecutionContext
) extends Actor {

  override def receive: Receive = {
    case m: Message =>
      val senderRef = sender()
      streamSource.offer((m, senderRef)).map {
        case QueueOfferResult.Enqueued => // do nothing for success
        case QueueOfferResult.Dropped => senderRef ! "error" // return error in case of backpressure 
        case QueueOfferResult.Failure(ex) => senderRef ! "error" // return error
        case QueueOfferResult.QueueClosed => senderRef ! "error" // return error
      }
  }
}

curl 'http://localhost:8080/healthcheck'

It works