1
votes

I am using Alpakka-kafka in scala to consume a Kafka topic. Here's my code:

    val kafkaConsumerSettings: ConsumerSettings[String, String] =
      ConsumerSettings(actorSystem, new StringDeserializer, new StringDeserializer)
        .withBootstrapServers(kafkaConfig.server)
        .withGroupId(kafkaConfig.group)
        .withProperties(
          ConsumerConfig.MAX_POLL_RECORDS_CONFIG       -> "100",
          ConsumerConfig.AUTO_OFFSET_RESET_CONFIG      -> "earliest",
          CommonClientConfigs.SECURITY_PROTOCOL_CONFIG -> "SSL"
        )

    Consumer
        .plainSource(kafkaConsumerSettings, Subscriptions.topics(kafkaConfig.topic))
        .runWith(Sink.foreach(println))

However, consumer only starts polling from the first uncommitted message in topic. I would like to always start from offset 0, regardless of messages being committed. With Alpakka consumer, how do I specify offset manually?

1

1 Answers

1
votes

I think you want to add a couple of config entries:

  1. ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG -> False so your job never save any offset

  2. ConsumerConfig.AUTO_OFFSET_RESET_CONFIG -> "earliest" so your job starts from the begining.

If your job already committed offsets in the past, you may have to reset its offset to earliest.