4
votes

I've looked through the examples at https://doc.akka.io/docs/akka-http/current/introduction.html for Akka HTTP routing and strangely for something built on top of Akka Streams none of the examples connect to a stream.

Can somebody show a simple example of creating a Java DSL flow (not Scala please), and then connecting a Route directly to that flow?

Or am I missing the point and it's not possible but requires some CompletionStage code within the Route to wait for a result of glue code that calls a Flow?

Edit: to clarify the flow can do something like append a string to a posted request body.

2
Could you please clarify what you want that stream to be doing? The http entity is a stream of bytes, your response can be a stream. And there’s various other ways to connect depending on your use case - are you streaming data in, out, or through? From where etc. Some examples are here doc.akka.io/docs/akka-http/current/…Konrad 'ktoso' Malawski

2 Answers

3
votes

Using akka streams to complete a route is definitely possible. It involves either:

  • a web socket route, see examples in the docs, or
  • a chunked http response (since you typically do not know the size of the response if it's fed from a stream). You can create a Chunked Entity from an akka stream Source of ByteStrings
  • you can also use other response types if the response size is known in advance, see docs for HttpEntity about their specifics
2
votes

Edit: to clarify the flow can do something like append a string to a posted request body.

Michał's answer contains good links, so please give them a read. Akka HTTP is by default and always streaming with its data -- e.g. the entities. So for example to do a streaming "echo" which at the same time adds a suffix, you could do something like this:

path("test", () ->
  // extract the request entity, it contains the streamed entity as `getDataBytes`
  extractRequestEntity(requestEntity -> {

  // prepare what to add as suffix to the incoming entity stream:
  Source<ByteString, NotUsed> suffixSource = 
    Source.single(ByteString.fromString("\n\nADDS THIS AFTER INCOMING ENTITY"))

  // concat the suffix stream to the incoming entity stream
  Source<ByteString, Object> replySource = requestEntity.getDataBytes()
    .concat(suffixSource);

    // prepare and return the entity:
    HttpEntity.Chunked replyEntity = HttpEntities.create(ContentTypes.TEXT_PLAIN_UTF8, replySource);
    return complete(StatusCodes.OK, replyEntity);
  })
);

Having that said, there is numerous ways to make use of the streaming capabilities, including framed JSON Streaming and more. You should also give the docs page about implications of streaming a read.