0
votes

we have an Akka Application that consume from an Kafka Topic and send the received Message to an Akka Actor. I am not sure that way I programmed used the all benefits of the Back Pressure mechanism built into Akka Streams.

Following is my configuration...

val control : Consumer.DrainingControl[Done]
Consumer
 .sourceWitOffsetContext(consumerSettings, Subscriptions.topics("myTopic"))
 .map(consumerRecord =>
     val myAvro = consumerRecord.value().asInstanceOf[MyAvro]
     
     val myActor = AkkaSystem.sharding.getMyActor(myAvro.getId)
     
     myActor ! Update(myAvro)          
 )
 .via(Commiter.flowWithOffsetContext(CommitterSettings(AkkaSystem.system.toClassic)))
 .toMat(Sink.ignore)(Consumer.DrainControl.apply)
 .run()

This does what I expect as my Business Case, myActor receive the Commands Update(MyAvro)

I am more irritated with the technical concepts of Back Pressure, as much as I can understand, Back Pressure mechanism are controlled partially from Sinks but in this Stream configuration, my Sink is only 'Sink.ignore'. So my Sink is doing anything for Back Pressure.

What I am also curious when Akka Kafka Stream commit the Kafka Topic offset? The moment the Command delivered to Mailbox of MyActor? If so then how I can handle scenarios like ask patterns, Kafka Offset should not commit until ask pattern completes.

I see some Factory Methods dealing with manual offset control 'plainPartitionedManualOffsetSource', 'commitablePartitionManualOffsetSource' but I can't find any example for those, can I decide with my business logic to manually commit the offsets?

As an alternative Configuration, I can use something like this.

val myActor: ActorRef[MyActor.Command] = AkkaSystem.sharding.getMyActor
val (control, result) =
  Consumer
    .plainSource(consumerSettings, Subscriptions.topics("myTopic"))
    .toMat(Sink.actorRef(AkkaSystem.sharding.getMyActor(myAvro.getId), null))(Keep.both)
    .run()

Now I have an access to Sink.actorRef, I think Back Pressure mechanism has a chance control Back Pressure, Naturally this code will not work because I have no idea how can I access 'myAvro' under this constellation.

Thx for answers..

2

2 Answers

1
votes

In the first stream, there will be basically no backpressure. The offset commit will happen very soon after the message gets sent to myActor.

For backpressure, you'll want to wait for a response from the target actor, and as you say, the ask pattern is the canonical way to accomplish that. Since an ask of an actor from outside an actor (which for all intents and purposes a stream is outside of an actor: that the stages are executed by actors is an implementation detail) results in a Future, this suggests that mapAsync is called for.

def askUpdate(m: MyAvro): Future[Response] = ???  // get actorref from cluster sharding, send ask, etc.

You would then replace the map in your original stream with

.mapAsync(parallelism) { consumerRecord =>
  askUpdate(consumeRecord.value().asInstanceOf[MyAvro])
}

mapAsync limits the "in-flight" futures to parallelism. If there are parallelism futures (spawned by it, of course), it will backpressure. If a spawned future completes with a failure (for the ask itself, this will generally be a timeout), it will fail; the results (respecting incoming order) of the successful futures will be passed on (very often, these will be akka.Done, especially when the only things left to do in the stream are offset commit and Sink.ignore).

0
votes

This statement is not correct:

... as much as I can understand, Back Pressure mechanism are controlled partially from Sinks but in this Stream configuration, my Sink is only 'Sink.ignore'. So my Sink is doing anything for Back Pressure.

There's nothing special about Sinks for backpressure. A backpressure as a flow control mechanism will be automatically used anywhere there's asynchronous boundary in the stream. That MAY be in Sink but it may as well be anywhere else in the stream.

In your case you're hooking up your stream to talk to an actor. That's your asynchronous boundary, but the way you do it is using map and inside that map u use ! to talk to an actor. So there is no backpressure, because:

  1. map is not an asynchronous operator and nothing called inside it can participate in backpressure mechanism. So from the Akka Stream perspective there is NO async boundary introduced.
  2. ! is fire and forget, there's no feedback provided as to how busy the actor is to enforce any backpressure.

Like Levi mentioned, what you can do is to change from tell to ask interaction and make the receiving actor respond when its work is done. Then you can use mapAsync like Levi describes. The difference here between map and mapAsync is that semantics of mapAsync are such that it will emit downstream only when returned Future completes. Even if parallelism is 1 the backpressure still works. In case your Kafka records come way faster than your actor can handle, mapAsync will backpressure upstream when it waits for Future completion. In this particular case I think increasing parallelism makes no sense as all those messages will be added to the actor's inbox, so you won't really speed anything up by doing this. If the interaction was say a REST call then it could improve the overall throughput. Depending on how your actor processes the messages, increasing parallelism for mapAsync may result in increased throughput. paralleslism value effectively caps the max number of not completed Futures allowed before backpressure kicks in.