I am probably missing the point of the Kafka Consumer but what I want to do is:
Consumer subscribes to a topic, grabs all messages within the topic and returns a Future with a list of all of those messages
The code I have written to try and accomplish this is
val sink = Sink.fold[List[KafkaMessage], KafkaMessage](List[KafkaMessage]()) { (list, kafkaMessage) =>
list :+ kafkaMessage
}
def consume(topic: String) =
Consumer.committableSource(consumerSettings, Subscriptions.topics(topic))
.map { message =>
logger.info(s"Consuming ${message.record.value}")
KafkaMessage(Some(message.record.key()), Some(message.record.value()))
}
.buffer(bufferSize, overflowStrategy)
.runWith(sink)
The Future never returns though, it consumes the necessary messages and then continues to poll the topic repeatedly. Is there a way to return the Future and then close the consumer?