I am working with a Scala 2.13 stack with the following technologies:
- play! framework 2.8
- akka typed 2.6.3
- alpakka kafka 2.0.3
An Akka-stream job reads event from Kafka, asks an actor to compute something, and based on the given response, produces new events back to Kafka.
The issue is the messages sent using the ask pattern seem to be consumed by the QuestionActor
(below) only when at least two messages are gathered by its mailbox and only one per message received.
The weird behavior is:
t1
ref ? Question("tr1", 1, None, actorRef)
> AskTimeoutException(tr1)
t2
ref ? Question("tr2", 1, None, actorRef)
> [INFO] - Question request for tr1-1. Processing.
> AskTimeoutException(tr2)
t3
ref ? Question("tr3", 1, None, actorRef)
> [INFO] - Question request for tr2-1. Processing.
> AskTimeoutException(tr3)
I'm trying then to understand why I observe this behavior and what I'm doing wrong.
The akka-stream Kafka pipeline is:
Consumer
.plainSource(consumerSettings, subscription)
.map(DeserializeEvents.fromService)
.filter(_.eventType == classOf[Item].getName)
.via(askFlowExplicit)
.withAttributes(ActorAttributes.supervisionStrategy(decider()))
.map(
response =>
new ProducerRecord[String, OutputItem](
topics,
OutputItem(response.getClass.getName, response)
)
)
.log("Kafka Pipeline")
.runWith(Producer.plainSink(producerSettings))
The decider is a supervision strategy, that resumes the job on Serialisation
and Timeout
exceptions; askFlowExplicit
declares an ask request to an external actor and - hereby - I bumped with my issue.
val askFlowExplicit =
ActorFlow.ask[OutputItem, Question, Answer](askTarget) {
case (envelope, replyTo) =>
val item = Serdes.deserialize[Item](envelope.payload)
Question(item.trID, item.id, item.user, replyTo)
}
The pipeline starts up on Play! application bootstrap
@Singleton
class ApplicationStart @Inject()(
configuration: Configuration,
questionActor: ActorRef[QuestionActor.Question]
) {
private implicit val logger = Logger.apply(getClass)
implicit val mat = context
AlpakkaPipeline.run(configuration, questionActor)
}
The actor is a simple typed actor belonging to the same actor system and - right now - it is only forwarding the request coming from the stream towards another service.
class QuestionActor(
configuration: Configuration,
context: ActorContext[Question],
itemService: ItemService
) extends AbstractBehavior[Question](context) {
import QuestionActor._
implicit val ec: ExecutionContextExecutor = context.executionContext
private implicit val timeout: Timeout = ...
override def onMessage(msg: Question): Behavior[Question] = Behaviors.receive[Question] {
case (context, Question(trID, id, user, sender)) =>
log.info(s"Question request for ${msg.trID}-${msg.id}. Processing.")
itemService
.action(id, user)
.onComplete {
case Success(result) if result.isEmpty =>
log.info("Action executed")
msg.replyTo ! NothingHappened(trID, id)
case Failure(e) =>
log.error("Action failed.", e)
msg.replyTo ! FailedAction(trID, id, user, e.getMessage)
}
Behaviors.same
}
}
object QuestionActor {
final case class Question(
trID: String,
id: Int,
user: Option[UUID],
replyTo: ActorRef[Answer]
)
def apply(itemService: ItemService, configuration: Configuration): Behavior[Question] =
Behaviors.setup { context =>
context.setLoggerName(classOf[QuestionActor])
implicit val log: Logger = context.log
new QuestionActor(configuration, context)
}
}
It is built using runtime DI and Play!
class BootstrapModule(environment: Environment, configuration: Configuration)
extends AbstractModule
with AkkaGuiceSupport {
override def configure(): Unit = {
bind(new TypeLiteral[ActorRef[CloneWithSender]]() {})
.toProvider(classOf[QuestionActorProvider])
.asEagerSingleton()
bind(classOf[ApplicationStart]).asEagerSingleton()
}
}
private class Question @Inject()(
actorSystem: ActorSystem,
itemService: ItemService,
configuration: Configuration
) extends Provider[ActorRef[Question]] {
def get(): ActorRef[Question] = {
val behavior = QuestionActor(itemService, configuration)
actorSystem.spawn(behavior, "question-actor")
}
}
What I tried
- changing dispatcher to
QuestionActor
- changing mailbox to
QuestionActor
- run the pipeline from within the
QuestionActor
- sending the same message from withing the actor constructor (to self), the same behavior observed: one further message will trigger the actor to consume the former, ask timeout for the latter.
What I didn't
- changing dispatcher to Akka stream pipeline
It looks to me as a threading problem right now, but I don't know where to go from here. Any help is really appreciated. Thank you in advance.
extends AbstractBehavior[Question](context)
, avoid passingcontext
as constructor argument, and just useBehaviors.setup + receiveMessage
. – Ivan StanislavciucAbstractBehavior
– spi-x-imsg.replyTo ! NothingHappened(trID, id)
withsender ! NothingHappened(trID, id)
– ignasi35