We are using Kafka and are looking to use interactive queries to get access to data in our state stores. We have an existing service which uses Akka HTTP to serve up a REST API, and we wanted to integrate interactive queries into the flow.
It seemed like kafka-streams-query would be a perfect fit for this. However, it integrates into Akka HTTP by exposing a route
property that uses the low-level API, which maps to a Flow[HttpRequest, HttpResponse, Any]
. All of our previous code concatenates code using Akka HTTP's routing DSL.
I would expect code like the following to work, but it doesn't:
implicit val system:ActorSystem = ActorSystem("web")
implicit val materializer:ActorMaterializer = ActorMaterializer()
implicit val ec = system.dispatcher
val firstRoutes:Route = ... //a route object populated
val lastRoutes:Route = ... //other route object populad
val iqServiceFlow:Flow[HttpRequest, HttpResponse, Any] = ...// code that returns the interactive query service
val firstFlow = Route.handlerFlow(firstRoutes)
val lastFlow = Route.handlerFlow(lastRoutes)
// The following code doesn't work though everything I've seen online suggests it should
val handler = firstFlow via iqServiceFlow via lastFlow
Http().bindAndHandle(handler, "0.0.0.0", 8000)
How can I combine flows in Akka Streams?
Edit: Corrected the handler assignment statement.