0
votes

I'm trying to design an Akka Stream using Alpakka to read events from kafka topic and put them to the Couchbase.

So far I have the following code and it seems to work somehow:

Consumer
      .committableSource(consumerSettings, Subscriptions.topics(topicIn))
      .map(profile ⇒ {
        RawJsonDocument.create(profile.record.key(), profile.record.value())
      })
      .via(
        CouchbaseFlow.upsertDoc(
          sessionSettings,
          writeSettings,
          bucketName
        )
      )
      .log("Couchbase stream logging")
      .runWith(Sink.seq)

By "somehow" I mean that the stream is actually reads events from topic and put them to Couchbase as json documents and it looks even nice despite a fact that I don't understand how to commit consumer offsets to Kafka.

If I've clearly understood the main idea that hides behind Kafka consumer offsets, in case of any failure or restart happens, the stream reads all messages from the last commited offset and, since we haven't committed any, it probably re-reads the records being read at previous session once again.

So am I right in my assumptions? If so, how to handle consumer commits in case of reading from Kafka and publishing to some database? The official Akka Streams documentation provides the examples showing how to deal with such cases using plain Kafka Streams, so I have no idea about how to committing the offsets in my case.

Great thanks!

1
Try passing the CommittableOffset throughout all function calls and then use a Commiter.sink to capture the offset and do the write back. - Dan W
So far I mainly got the idea that I have to provide a CommitableOffset to the related sink using toMat or via(Committer.flow), but the problem is that I can't wrap my head around a possible implementation. - Alex Sergeenko
Why not use Kafka Connect plugin offered by Couchbase? - OneCricketeer
@cricket_007 I wasn't aware about it, will take a look, thanks! - Alex Sergeenko

1 Answers

1
votes

You will need to commit the offsets in Couchbase in order to obtain "exactly once" semantics.

This should help: https://doc.akka.io/docs/alpakka-kafka/current/consumer.html#offset-storage-external-to-kafka