I want my Lagom subscriber-only service to subscribe to a Kafka Topic and stream the messages to a websocket. I have a service defined as follows using this documentation (https://www.lagomframework.com/documentation/1.4.x/scala/MessageBrokerApi.html#Subscribe-to-a-topic) as a guideline:
// service call
def stream(): ServiceCall[Source[String, NotUsed], Source[String, NotUsed]]
// service implementation
override def stream() = ServiceCall { req =>
req.runForeach(str => log.info(s"client: %str"))
kafkaTopic().subscribe.atLeastOnce(Flow.fromFunction(
// add message to a Source and return Done
))
Future.successful(//some Source[String, NotUsed])
However, I can't quite figure out how to handle my kafka message. The Flow.fromFunction
returns [String, Done, _]
and implies that I need to add those messages (strings) to a Source that has been created outside of the subscriber.
So my question is twofold: 1) How do I create an akka stream source to receive messages from a kafka topic subscriber at runtime? 2) How do I append kafka messages to said source while in the Flow?
req.runForeach(str => log.info(s"client: %str"))
will exhaust your entire input stream. - erip