1
votes

Right now I am using akka-stream and akka-HTTP to build a file streaming API. As such I am injecting a streaming source into an entity to have data streamed directly to the HTTP client like so:

complete(HttpEntity(ContentTypes.`application/octet-stream`, source))

However, if for some reason the stream fails, the connection gets closed by akka-http without further explanation or logging.

I would need 2 things:

  • How can I get the exception logs?
  • How can I notify my client with a message before closing the connection?

Thank you

1
I filed an issue about the error not being logged properly: github.com/akka/akka-http/issues/894. Sending some kind of status to the client is unfortunately impossible with streaming. In most cases, when the error occurs, akka-http is in the middle of transferring the response entity to the client. There's no mechanism in the HTTP protocol that would allow to signal an error to the client aside from resetting the connection (which is what we do). - jrudolph

1 Answers

0
votes

As mentioned in comment HTTP protocol does not allow to signal error to the client side.

As to logging: For me it boils down to missing proper access log directive in akka http.

In my current project we have decorator which register onComplete handler for http entity before giving it to akka http for rendering.

  private def onResponseStreamEnd(response: HttpResponse)(action: StatusCode => Unit): HttpResponse =
    if (!response.status.allowsEntity() || response.entity.isKnownEmpty()) {
      action(response.status)
      response
    } else {
      val dataBytes =
        onStreamEnd(response.entity) { result =>
          val overallStatusCode =
            result match {
              case Success(_) =>
                response.status

              case Failure(e) =>
                logger.error(e, s"error streaming response [${e.getMessage}]")
                StatusCodes.InternalServerError
            }

          action(overallStatusCode)
        }

      response.withEntity(response.entity.contentLengthOption match {
        case Some(length) => HttpEntity(response.entity.contentType, length, dataBytes)
        case None         => HttpEntity(response.entity.contentType, dataBytes)
      })
    }

  private def onStreamEnd(entity: HttpEntity)(onComplete: Try[Done] ⇒ Unit): Source[ByteString, _] =
    entity.dataBytes.alsoTo { Sink.onComplete(onComplete) }

Usage:

complete(onResponseStreamEnd(HttpResponse(StatusCodes.OK, HttpEntity(ContentTypes.`application/octet-stream`, source))){ statusCode => .... })

Similar approach but using custom graph stage you can find here