Is it at all possible to get the last message on a Kafka Topic using Akka Streams Kafka? I'm creating a websocket which listens to a Kafka Topic, but currently it retrieves all prior unred messages when I connecting. This can add up to quite a lot of messages, so I'm only interrested in the last message + any future messages. (or only future messages)
The source:
def source(): Flow[Any, String, NotUsed] = {
val source = Consumer.plainSource(consumerSettings, Subscriptions.topics(MyTopic))
Flow.fromSinkAndSource[Any, String](Sink.ignore, source.map(_.value)
}
Consumer settings:
@Provides
def providesConsumerSettings(@Named("kafkaUrl") kafkaUrl: String): ConsumerSettings[String, String] = {
val deserializer = new StringDeserializer()
val config = configuration.getOptional[Configuration]("akka.kafka.consumer")
.getOrElse(Configuration.empty)
ConsumerSettings(config.underlying, deserializer, deserializer)
.withBootstrapServers(kafkaUrl)
.withGroupId(GroupId)
}
I've tried adding setting the ConsumerSettings.withProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest")
Which should "automatically reset the offset to the latest offset", but it does not seem to have any effect.