0
votes

We are using Kafka and are looking to use interactive queries to get access to data in our state stores. We have an existing service which uses Akka HTTP to serve up a REST API, and we wanted to integrate interactive queries into the flow.

It seemed like kafka-streams-query would be a perfect fit for this. However, it integrates into Akka HTTP by exposing a route property that uses the low-level API, which maps to a Flow[HttpRequest, HttpResponse, Any]. All of our previous code concatenates code using Akka HTTP's routing DSL.

I would expect code like the following to work, but it doesn't:

implicit val system:ActorSystem = ActorSystem("web")
implicit val materializer:ActorMaterializer = ActorMaterializer()
implicit val ec = system.dispatcher

val firstRoutes:Route = ... //a route object populated
val lastRoutes:Route = ... //other route object populad

val iqServiceFlow:Flow[HttpRequest, HttpResponse, Any] = ...// code that returns the interactive query service

val firstFlow = Route.handlerFlow(firstRoutes)
val lastFlow = Route.handlerFlow(lastRoutes)

// The following code doesn't work though everything I've seen online suggests it should
val handler = firstFlow via iqServiceFlow via lastFlow

Http().bindAndHandle(handler, "0.0.0.0", 8000)

How can I combine flows in Akka Streams?

Edit: Corrected the handler assignment statement.

1

1 Answers

6
votes

For clarity, let's start by making all the return types explicit:

val iqServiceFlow: Flow[HttpRequest, HttpResponse, Any] = ...
val firstFlow: Flow[HttpRequest, HttpResponse, NotUsed] = Route.handlerFlow(firstRoutes)
val lastFlow: Flow[HttpRequest, HttpResponse, NotUsed]  = Route.handlerFlow(lastRoutes)

Also, instead of...

val handler = firstRoutes via iqServiceFlow via lastFlow

...you probably meant:

val handler = firstFlow via iqServiceFlow via lastFlow

In order to chain flows together with via, the input and output types must match: that is, the output type of the first flow must be the same as the input type of the second flow, and so on. What you're trying to do with your handler is the following:

[HttpRequest, HttpResponse] // firstFlow
                   |
                   v
             [HttpRequest, HttpResponse] // iqServiceFlow
                                |
                                v
                          [HttpRequest, HttpResponse] // lastFlow

The output type of all your flows is HttpResponse, but their respective input types are all HttpRequest, so you cannot chain them together with via.

To chain your flows, you need a function that somehow converts a HttpResponse to an HttpRequest:

val respToReq: HttpResponse => HttpRequest = ...

You can create a flow from the above function:

val convertingFlow: Flow[HttpResponse, HttpRequest] = Flow.fromFunction(respToReq)

Now you can chain your flows:

val handler = firstFlow via convertingFlow via iqServiceFlow via convertingFlow via lastFlow

The types align as follows:

[HttpRequest, HttpResponse] // firstFlow
                   |
                   v
             [HttpResponse, HttpRequest] // convertingFlow
                                |
                                v
                           [HttpRequest, HttpResponse] // iqServiceFlow
                                              |
                                              v
                                        [HttpResponse, HttpRequest] // convertingFlow
                                                            |
                                                            v                              
                                                      [HttpRequest, HttpResponse] // lastFlow

The above assumes that you can reuse the same conversion function/flow. If this assumption doesn't hold, you can obviously create different conversion functions/flows.