1
votes

I have the following source queue definition.

lazy val (processMessageSource, processMessageQueueFuture) =
   peekMatValue(
      Source
        .queue[(ProcessMessageInputData, Promise[ProcessMessageOutputData])](5, OverflowStrategy.dropNew))


def peekMatValue[T, M](src: Source[T, M]): (Source[T, M], Future[M])  {
    val p = Promise[M]
    val s = src.mapMaterializedValue { m =>
      p.trySuccess(m)
      m
    }  
    (s, p.future)
  }

The Process Message Input Data Class is essentially an artifact that is created when a caller calls a web server endpoint, which is hooked upto this stream (i.e. the service endpoint's business logic puts messages into this queue). The Promise of process message out is something that is completed downstream in the sink of the application, and the web server then has an on complete callback on this future to return the response back.

There are also other sources of ingress into this stream.

Now the buffer may be backed up since the other source may overload the system, thereby triggering stream back pressure. The existing code just drops the new message. But I still want to complete the process message output promise to complete with an exception stating something like "Throttled".

Is there a mechanism to write a custom overflow strategy, or a post processing on the overflowed element that allows me to do this?

1
The default strategy is to drop elements, you can simply change the OverflowStrategy to backpressure(). Check the docs: doc.akka.io/japi/akka/current/akka/stream/OverflowStrategy.html, T - Explorer
Yea but what would a backpressure to a source queue even mean though? - Arunav Sanyal

1 Answers

0
votes

According to https://github.com/akka/akka/blob/master/akkastream/src/main/scala/akka/stream/impl/QueueSource.scala#L83

dropNew would work just fine. On clients end it would look like.

processMessageQueue.offer(in, pr).foreach { res =>
  res match {
    case Enqueued => // Code to handle case when successfully enqueued. 
    case Dropped => // Code to handle messages that are dropped since the buffier was overflowing. 
  }
}