1
votes

I am working with a Scala 2.13 stack with the following technologies:

  • play! framework 2.8
  • akka typed 2.6.3
  • alpakka kafka 2.0.3

An Akka-stream job reads event from Kafka, asks an actor to compute something, and based on the given response, produces new events back to Kafka.

The issue is the messages sent using the ask pattern seem to be consumed by the QuestionActor (below) only when at least two messages are gathered by its mailbox and only one per message received.

The weird behavior is:

t1

ref ? Question("tr1", 1, None, actorRef)
> AskTimeoutException(tr1)

t2

ref ? Question("tr2", 1, None, actorRef)
> [INFO] - Question request for tr1-1. Processing.
> AskTimeoutException(tr2)

t3

ref ? Question("tr3", 1, None, actorRef)
> [INFO] - Question request for tr2-1. Processing.
> AskTimeoutException(tr3)

I'm trying then to understand why I observe this behavior and what I'm doing wrong.

The akka-stream Kafka pipeline is:

Consumer
  .plainSource(consumerSettings, subscription)
  .map(DeserializeEvents.fromService)
  .filter(_.eventType == classOf[Item].getName)
  .via(askFlowExplicit)
  .withAttributes(ActorAttributes.supervisionStrategy(decider()))
  .map(
    response =>
      new ProducerRecord[String, OutputItem](
        topics,
        OutputItem(response.getClass.getName, response)
      )
  )
  .log("Kafka Pipeline")
  .runWith(Producer.plainSink(producerSettings))

The decider is a supervision strategy, that resumes the job on Serialisation and Timeout exceptions; askFlowExplicit declares an ask request to an external actor and - hereby - I bumped with my issue.

val askFlowExplicit =
  ActorFlow.ask[OutputItem, Question, Answer](askTarget) {
    case (envelope, replyTo) =>
      val item = Serdes.deserialize[Item](envelope.payload)
      Question(item.trID, item.id, item.user, replyTo)
  }

The pipeline starts up on Play! application bootstrap

@Singleton
class ApplicationStart @Inject()(
    configuration: Configuration,
    questionActor: ActorRef[QuestionActor.Question]
) {
  private implicit val logger = Logger.apply(getClass)
  implicit val mat            = context
  AlpakkaPipeline.run(configuration, questionActor)
}

The actor is a simple typed actor belonging to the same actor system and - right now - it is only forwarding the request coming from the stream towards another service.

class QuestionActor(
    configuration: Configuration,
    context: ActorContext[Question],
    itemService: ItemService
) extends AbstractBehavior[Question](context) {
  import QuestionActor._

  implicit val ec: ExecutionContextExecutor = context.executionContext
  private implicit val timeout: Timeout = ...

  override def onMessage(msg: Question): Behavior[Question] = Behaviors.receive[Question] {
    case (context, Question(trID, id, user, sender)) =>
      log.info(s"Question request for ${msg.trID}-${msg.id}. Processing.")
        itemService
          .action(id, user)
          .onComplete {
            case Success(result) if result.isEmpty =>
              log.info("Action executed")
              msg.replyTo ! NothingHappened(trID, id)
            case Failure(e) =>
              log.error("Action failed.", e)
              msg.replyTo ! FailedAction(trID, id, user, e.getMessage)
          }
      Behaviors.same
  }
}

object QuestionActor {
  final case class Question(
      trID: String,
      id: Int,
      user: Option[UUID],
      replyTo: ActorRef[Answer]
  )

  def apply(itemService: ItemService, configuration: Configuration): Behavior[Question] =
    Behaviors.setup { context =>
      context.setLoggerName(classOf[QuestionActor])
      implicit val log: Logger = context.log
      new QuestionActor(configuration, context)
    }
}

It is built using runtime DI and Play!

class BootstrapModule(environment: Environment, configuration: Configuration)
    extends AbstractModule
    with AkkaGuiceSupport {

  override def configure(): Unit = {
    bind(new TypeLiteral[ActorRef[CloneWithSender]]() {})
      .toProvider(classOf[QuestionActorProvider])
      .asEagerSingleton()
    bind(classOf[ApplicationStart]).asEagerSingleton()
  }
}

private class Question @Inject()(
    actorSystem: ActorSystem,
    itemService: ItemService,
    configuration: Configuration
) extends Provider[ActorRef[Question]] {
  def get(): ActorRef[Question] = {
    val behavior = QuestionActor(itemService, configuration)
    actorSystem.spawn(behavior, "question-actor")
  }
}

What I tried

  • changing dispatcher to QuestionActor
  • changing mailbox to QuestionActor
  • run the pipeline from within the QuestionActor
  • sending the same message from withing the actor constructor (to self), the same behavior observed: one further message will trigger the actor to consume the former, ask timeout for the latter.

What I didn't

  • changing dispatcher to Akka stream pipeline

It looks to me as a threading problem right now, but I don't know where to go from here. Any help is really appreciated. Thank you in advance.

1
Can you reproduce the problem by just sending messages to the actor without integrating it into the flow?Ivan Stanislavciuc
It's hard to see where the problem is. But you have too many context defined. Try not to extends AbstractBehavior[Question](context), avoid passing context as constructor argument, and just use Behaviors.setup + receiveMessage.Ivan Stanislavciuc
Hi Ivan, yes, I tried sending the same message to self from within the actor constructor, I can observe the same behavior. (even if I don't use ask). I updated the question.spi-x-i
Will try to avoid AbstractBehaviorspi-x-i
Try replacing msg.replyTo ! NothingHappened(trID, id) with sender ! NothingHappened(trID, id)ignasi35

1 Answers

3
votes

The problem is that you're combining AbstractBehavior which provides onMessage and there you define a new Behaviors.receive[Question] behaviour. You have to use either one or the other.

Remove Behaviors.receive as following

  override def onMessage(msg: Question): Behavior[Question] = {
      log.info(s"Question request for ${msg.trID}-${msg.id}. Processing.")
        itemService
          .action(id, user)
          .onComplete {
            case Success(result) if result.isEmpty =>
              log.info("Action executed")
              msg.replyTo ! NothingHappened(trID, id)
            case Failure(e) =>
              log.error("Action failed.", e)
              msg.replyTo ! FailedAction(trID, id, user, e.getMessage)
          }
      Behaviors.same
  }
}

AbstractBehavior.onMessage is implementation of a behaviour. So, you receive a message via method argument, you're supposed to process it and return back a new Behaviour, Behaviours.same in your case.

But instead of processing the message, you create a new Behaviour with Behaviors.receive and register the callback of the Future to the original first message. Thus you see the log statement when second message arrives, which triggers the new behaviour.

If you want to use FP style definitions, you have to stick to Behaviors.xxx helper methods only. If you choose OOP style, then you extend AbstractBehavior. But you shouldn't do both.