Since it seems like you have an actor that needs to do processing and make transitions to an FSM, I suggest you follow these guidelines (some basic code outline follows this list):
- Actor A receives the message in question that will trigger some processing
- Actor A transitions a separate FSM actor, say F, (
akka.actor.FSM
) to the appropriate state. Actor A on startup would spawn the specific FSM to track the state for the corresponding context (e.g. for all transactions or state per transaction or some other context). The code outline below uses all processing or completed transactions as the context for the example but may need to be changed.
- Actor A then triggers whatever processing should be triggered for the message. Remember actors shouldn't block generally, but here is an answer that provides more guidelines on when an Akka actor may block.
- Alternative: If you can trigger long running processing without blocking and ensure you receive the necessary events after processing stages by the other party, then you could eliminate the front Actor A and just have the FSM actor F. You should look at
onTransition
in this case.
So my code outline suggestion is based on what I understood from the question is:
sealed trait MyEvents
case class ProcessingStarted(txnId: Long) extends MyEvents
case class ProcessingFinished(txnId: Long, result: Result) extends MyEvents
sealed trait MyStates
case object Idle extends MyStates
case object Processing extends MyStates
sealed trait MyDataTypes
case object Uninitialized extends MyDataTypes
case class StateData(processingIds: Seq[Long], resultMap: Map[Long, Result]) extends MyDataTypes
import akka.actor.{ Actor, ActorRef, FSM }
import scala.concurrent.duration._
class ActorF extends FSM[MyStates, MyDataTypes] {
startWith(Idle, Uninitialized)
when(Idle) {
case Event(ProcessingStarted(txnId), Uninitialized) =>
goto(Processing) using StateData(Seq(txnId), Map.empty[Long, Result])
case Event(ProcessingStarted(txnId), StateData(emptyIds, resultMap)) =>
goto(Processing) using StateData(Seq(txnId), resultMap)
}
when(Processing) {
case Event(ProcessingFinished(txnId, result), StateData(processingIds, resultMap)) => {
val remainingIds = processingIds diff Seq(txnId)
val newResultMap = resultMap + (txnId -> result)
if (remainingIds.isEmpty) {
goto(Idle) using StateData(remainingIds, newResultMap)
} else {
stay using StateData(remainingIds, newResultMap)
}
}
}
initialize()
}
val f = system.actorOf(Props(classOf[ActorF], this))
f ! ProcessingStarted(txnId)
If you choose to trigger off non-blocking processing requests to other parts of your code you can use onTransition
to add the trigger code in as necessary. You may also want to update the MyEvents case classes to a different tense. The event naming was used above was to show that something else was responsible for triggering that off (e.g. Actor A the received the initial message to do some things).
Also take note of Supervision capabilities of Akka which could be used here to supervise the actors in question.
For more details read the following which might help further with building your FSM, testing it, using non-blocking methods to trigger long-running processing externally. All of which might be useful for your needs: