1
votes

I'm starting with Akka Streams and I would like to build a server as Stream which recives a Http.IncomingConnection and sends the message recived to Kafka as plainSink.

I declared my source as given below:

val serverSource: Source[Http.IncomingConnection, Future[Http.ServerBinding]] =
    Http().bind(interface = "localhost", port = "8080")

Then, I want to extract the message (String) from the HttpRequest's body and, finally, send it the String to Kafka. The flow is something like given below:

val bindingFuture: Future[Http.ServerBinding] = serverSource
     .map(???) //Here, I need to extract the message
     .map(message => new ProducerRecord[String, String](topic, message.result(2 seconds)))
     .runWith(akka.kafka.scaladsl.Producer.plainSink(producerSettings))

But, I don't know how to extract the message. I would like to do something like this:

val requestHandler: HttpRequest => HttpResponse = {
    case HttpRequest(POST, Uri.Path("/publish"), _, _, _) => {
      HttpResponse(202, entity = "Message sent to Kafka!")
    }
    case r: HttpRequest =>
      r.discardEntityBytes() // important to drain incoming HTTP Entity stream
      HttpResponse(404, entity = "Unknown resource!")
  }

But, using connection handleWithSyncHandler requestHandler I can't get the message to follow with the stream process. And, also, I would like to get any request under /publish URI, or return 404 in other case inside the stream.

Is it possible to do this?

1

1 Answers

1
votes

Use Directives Instead

The Routing DSL would be easier to use than trying to process the HttpRequest by hand:

val route : Route = 
  post {
    path("publish") {
      extractRequestEntity { entity =>
        onComplete(entity.toStrict(10.seconds).map(_.data.utf8String){ message =>
          Producer.plainSink(producerSettings)(
            new ProducerRecord[String, String](topic, message.result(2 seconds))
          )
          complete(StatusCodes.OK)
        } 
      }
    }
  }

This can now be passed in to process incoming requests:

Http().bindAndHandle(
  route,
  "localhost",
  8080
)