we have an Akka Application that consume from an Kafka Topic and send the received Message to an Akka Actor. I am not sure that way I programmed used the all benefits of the Back Pressure mechanism built into Akka Streams.
Following is my configuration...
val control : Consumer.DrainingControl[Done]
Consumer
.sourceWitOffsetContext(consumerSettings, Subscriptions.topics("myTopic"))
.map(consumerRecord =>
val myAvro = consumerRecord.value().asInstanceOf[MyAvro]
val myActor = AkkaSystem.sharding.getMyActor(myAvro.getId)
myActor ! Update(myAvro)
)
.via(Commiter.flowWithOffsetContext(CommitterSettings(AkkaSystem.system.toClassic)))
.toMat(Sink.ignore)(Consumer.DrainControl.apply)
.run()
This does what I expect as my Business Case, myActor receive the Commands Update(MyAvro)
I am more irritated with the technical concepts of Back Pressure, as much as I can understand, Back Pressure mechanism are controlled partially from Sinks but in this Stream configuration, my Sink is only 'Sink.ignore'. So my Sink is doing anything for Back Pressure.
What I am also curious when Akka Kafka Stream commit the Kafka Topic offset? The moment the Command delivered to Mailbox of MyActor? If so then how I can handle scenarios like ask patterns, Kafka Offset should not commit until ask pattern completes.
I see some Factory Methods dealing with manual offset control 'plainPartitionedManualOffsetSource', 'commitablePartitionManualOffsetSource' but I can't find any example for those, can I decide with my business logic to manually commit the offsets?
As an alternative Configuration, I can use something like this.
val myActor: ActorRef[MyActor.Command] = AkkaSystem.sharding.getMyActor
val (control, result) =
Consumer
.plainSource(consumerSettings, Subscriptions.topics("myTopic"))
.toMat(Sink.actorRef(AkkaSystem.sharding.getMyActor(myAvro.getId), null))(Keep.both)
.run()
Now I have an access to Sink.actorRef, I think Back Pressure mechanism has a chance control Back Pressure, Naturally this code will not work because I have no idea how can I access 'myAvro' under this constellation.
Thx for answers..