I'm starting with Akka Streams and I would like to build a server as Stream which recives a Http.IncomingConnection
and sends the message recived to Kafka as plainSink.
I declared my source as given below:
val serverSource: Source[Http.IncomingConnection, Future[Http.ServerBinding]] =
Http().bind(interface = "localhost", port = "8080")
Then, I want to extract the message (String) from the HttpRequest's body and, finally, send it the String to Kafka. The flow is something like given below:
val bindingFuture: Future[Http.ServerBinding] = serverSource
.map(???) //Here, I need to extract the message
.map(message => new ProducerRecord[String, String](topic, message.result(2 seconds)))
.runWith(akka.kafka.scaladsl.Producer.plainSink(producerSettings))
But, I don't know how to extract the message. I would like to do something like this:
val requestHandler: HttpRequest => HttpResponse = {
case HttpRequest(POST, Uri.Path("/publish"), _, _, _) => {
HttpResponse(202, entity = "Message sent to Kafka!")
}
case r: HttpRequest =>
r.discardEntityBytes() // important to drain incoming HTTP Entity stream
HttpResponse(404, entity = "Unknown resource!")
}
But, using connection handleWithSyncHandler requestHandler
I can't get the message to follow with the stream process. And, also, I would like to get any request under /publish
URI, or return 404 in other case inside the stream.
Is it possible to do this?