1
votes

My current code has signature:

def putObject(key: String, contentType: ContentType, data: Source[ByteString,_]): Future[HttpReponse]

(which simply delegates to akka-http client)

I would like the signature to be

def putObject(key: String, contentType: ContentType): Sink[ByteString, Future[HttpReponse]]

How do I create a sink that consumes all ByteStrings as the body for a single HttpRequest without buffering all bytes in memory?

1

1 Answers

0
votes

Inverting the control of what runs the stream can be impossible. In the first API a user gives a Source to the implementation of the library and gives the library the control over when that Source will be ran.

The second API returns the Sink instead. This also means that the control of running the stream is given back to the user. That is unsupported, because now some part of the stream needs to be ran by the user and another part of the same stream by Http library.

What you could do is changing the API to be:

def putObject(key: String, contentType: ContentType): Sink[Source[ByteString, NotUsed], Future[HttpResponse]] =

And the implementation then would be like:

val conn = Http().outgoingConnection(host = ???)
val promise = Promise[HttpResponse]
Flow[Source[ByteString, NotUsed]]
  .map(data => HttpRequest(entity = HttpEntity(contentType, data)))
  .via(conn)
  .toMat(Sink.head)(Keep.right)