I intend to model a trivial game-play (HTTPReq/HTTPResp) using Akka Streams. In a round, the player is challenged to guess a number by the server. Server checks the player's response and if the what server holds and what the player guesses are the same, then the player is given a point.
A typical flow is like this:
- Player (already authenticated, sessionID assigned) requests to start a round
- Server checks if the sessionID is valid; if it is not, the player is informed with a suitable message
- Server generates a number and offers to the player, along with a RoundID
... so on. Nothing extraordinary.
This is a rough arrangement of types and the flows:
import akka.{Done, NotUsed}
import akka.stream.scaladsl.{Flow, Keep, Source}
import java.util.Random
import akka.stream.scaladsl.Sink
import scala.concurrent.Future
import scala.util.{Failure, Success}
sealed trait GuessingGameMessageToAndFro
case class StartARound(sessionID: String) extends GuessingGameMessageToAndFro
case class RoundStarted(sessionID: String, roundID: Int) extends GuessingGameMessageToAndFro
case class NumberGuessed(sessionID: String, roundID: Int, guessedNo: Int) extends GuessingGameMessageToAndFro
case class CorrectNumberGuessed(sessionID: String, nextRoundID: Int) extends GuessingGameMessageToAndFro
case class FinalRoundScore(sessionID: String, finalScore: Int) extends GuessingGameMessageToAndFro
case class MissingSession(sessionID: String) extends GuessingGameMessageToAndFro
case class IncorrectNumberGuessed(sessionID: String, clientGuessed: Int, serverChose: Int) extends GuessingGameMessageToAndFro
object SessionService {
def exists(m: StartARound) = if (m.sessionID.startsWith("1")) m else MissingSession(m.sessionID)
}
object NumberGenerator {
def numberToOfferToPlayer(m: GuessingGameMessageToAndFro) = {
m match {
case StartARound(s) => RoundStarted(s, new Random().nextInt())
case MissingSession(s) => m
case _ => throw new RuntimeException("Not yet implemented")
}
}
}
val sessionExistenceChecker: Flow[StartARound,GuessingGameMessageToAndFro,NotUsed]
= Flow.fromFunction(m => SessionService.exists(m))
val guessNumberPreparator: Flow[GuessingGameMessageToAndFro,GuessingGameMessageToAndFro,_]
= Flow.fromFunction(m => NumberGenerator.numberToOfferToPlayer(m))
val s1 = StartARound("123")
val k =
Source
.single(s1)
.via(sessionExistenceChecker)
.via(guessNumberPreparator)
.toMat(Sink.head)(Keep.right)
val finallyObtained = k.run
finallyObtained.onComplete(v => {
v match {
case Success(x) => // Prepare proper HTTP Response
case Failure(ex) => // Prepare proper HTTP Response
}
})
The reason I am going through a long pattern matching block in numberToOfferToPlayer() (I have shown 2 here, but obviously its size will increase with every type that can flow) is because if the operator like sessionExistenceChecker generates a MissingSession (which is an error condition), it has to travel through the rest of stream, unchanged till it reaches the Future[Done] stage. In fact, the problem is more general: at any stage, a proper transformation should result into an acceptable type or an error type (mutually exclusive). If I follow this approach, the pattern-matching blocks will proliferate, at the cost of unnecessary repetition, if not ugliness perhaps.
I am feeling uncomfortable with this solution of mine. It is becoming verbose and ungainly.
Needless to say, I have not shown the Akka-HTTP facing part here (including the Routes). The code above can be easily stitched, with the route handlers. So, I have skipped it.
My question is: what is a right idiom for such streams? Conceptually speaking, if everything is fine, the elements should keep moving along the stream. However, whenever an error occurs, the (error) element should shoot off to the final stage, directly, skipping all other stages in between. What is the accepted way to model this?
I have gone through a number of Stackoverflow posts, which demonstrate that for similar situations, one should go the partition/merge way. I understand how I can adopt that approach, but for simple cases like mine, that seems to be unnecessary work. Or, am I completely off the mark here?
Any hint, snippet or rap on the knuckles, will be appreciated.