1
votes

What is the advantage of using Source Streaming vs the regular way of handling requests? My understanding that in both cases

  1. The TCP connection will be reused
  2. Back-pressure will be applied between the client and the server

The only advantage of Source Streaming I can see is if there is a very large response and the client prefers to consume it in smaller chunks.

My use case is that I have a very long list of users (millions), and I need to call a service that performs some filtering on the users, and returns a subset.

Currently, on the server side I expose a batch API, and on the client, I just split the users into chunks of 1000, and make X batch calls in parallel using Akka HTTP Host API.

I am considering switching to HTTP streaming, but cannot quite figure out what would be the value

1

1 Answers

1
votes

You are missing one other huge benefit: memory efficiency. By having a streamed pipeline, client/server/client, all parties safely process data without running the risk of blowing up the memory allocation. This is particularly useful on the server side, where you always have to assume the clients may do something malicious...

Client Request Creation

Suppose the ultimate source of your millions of users is a file. You can create a stream source from this file:

val userFilePath : java.nio.file.Path = ???

val userFileSource = akka.stream.scaladsl.FileIO(userFilePath)

This source can you be use to create your http request which will stream the users to the service:

import akka.http.scaladsl.model.HttpEntity.{Chunked, ChunkStreamPart}
import akka.http.scaladsl.model.{RequestEntity, ContentTypes, HttpRequest}

val httpRequest : HttpRequest = 
  HttpRequest(uri = "http://filterService.io", 
              entity = Chunked.fromData(ContentTypes.`text/plain(UTF-8)`, userFileSource))

This request will now stream the users to the service without consuming the entire file into memory. Only chunks of data will be buffered at a time, therefore, you can send a request with potentially an infinite number of users and your client will be fine.

Server Request Processing

Similarly, your server can be designed to accept a request with an entity that can potentially be of infinite length.

Your questions says the service will filter the users, assuming we have a filtering function:

val isValidUser : (String) => Boolean = ???

This can be used to filter the incoming request entity and create a response entity which will feed the response:

import akka.http.scaladsl.server.Directives._
import akka.http.scaladsl.model.HttpResponse
import akka.http.scaladsl.model.HttpEntity.Chunked

val route = extractDataBytes { userSource =>
  val responseSource : Source[ByteString, _] = 
    userSource
      .map(_.utf8String)
      .filter(isValidUser)
      .map(ByteString.apply)

  complete(HttpResponse(entity=Chunked.fromData(ContentTypes.`text/plain(UTF-8)`, 
                                                responseSource)))
}

Client Response Processing

The client can similarly process the filtered users without reading them all into memory. We can, for example, dispatch the request and send all of the valid users to the console:

import akka.http.scaladsl.Http

Http()
  .singleRequest(httpRequest)
  .map { response =>
    response
      .entity
      .dataBytes
      .map(_.utf8String)
      .foreach(System.out.println)
  }