2
votes

I'm trying to create a Source that provides OAuth2 tokens and that also takes care of refreshing expired tokens. Currently my code looks a bit like this

  case class Token(expires: Instant = Instant.now().plus(100, ChronoUnit.MILLIS)){
    def expired = Instant.now().isAfter(expires)
  }

  Source
    .repeat()
    .mapAsync(1){ _ =>
      println("  -> token req")
      // this fakes an async token request to the token service
      Future{
        Thread.sleep(500)
        println("  <- token resp")
        Token()
      }
    }
    .mapAsync(1){ token =>
      println("  -> req with token auth")
      if(token.expired){
        println("!!! Received expired token")
      }
      // this is the actual call that needs the token
      println("making call")
      Future{
        Thread.sleep(2000)
        println("  <- req resp")
        "OK"
      }
    }
    .take(2)
    .runWith(Sink.ignore)
    .recover{case e => ()}
    .flatMap{ _ =>
      system.terminate()
    }

Output of this code looks like this

root   -> token req
root   <- token resp
root   -> token req
root   -> req with token auth
root making call
root   <- token resp
root   -> token req
root   <- token resp
root   -> token req
root   <- token resp
root   -> token req
root   <- req resp
root   -> req with token auth
root !!! Received expired token
root making call
root   <- token resp
root   -> token req
root   <- token resp
root   -> token req
root   <- token resp
root   <- req resp
root   -> req with token auth
root !!! Received expired token
root making call
root ... finished with exit code 0

Clearly this mapAsync(1) is producing demand when not expected (prefetching?)

There are 2 issues:

  • demand causes unneeded token requests upstream
  • the pre-fetching/caching of the tokens is problematic as they are only valid for a specific amount of time

So how do I create a true pull stream that behaves like this function?

def tokenSource: () => Future[Token]

2

2 Answers

3
votes

If you are purposefully trying to avoid pre-fetching and queuing then I think scala.collection.immutable.Stream, or Iterator, is a better solution than akka Stream.

Below is an example implementation that avoids the pitfalls you enumerated in your question. (Note: I used an ActorSystem to create the ExecutionContext, via dispatcher, to prevent the application from exiting before the sleep invocations have time to complete. I'm taking advantage of the fact that an ActorSystem does not shut down just because the main function reaches the end of the expression definition.)

import scala.collection.immutable.Stream
import scala.concurrent.Future

object ScalaStreamTest extends App {    
  case class Token(expires: Long = System.currentTimeMillis() + 100){
    def expired = System.currentTimeMillis() > expires
  }

  val actorSystem = akka.actor.ActorSystem()      
  import actorSystem.dispatcher

  def createToken =  Future {
    Thread.sleep(500)
    println("  <- token resp")
    Token()
  }

  def checkExpiration(token : Future[Token]) = token map { t =>
    println("  -> req with token auth")
    if(t.expired){println("!!! Received expired token")}
    t
  }

  def makeCall(token : Future[Token]) = token flatMap { t =>
    println("making call")
    Future {
      Thread.sleep(2000)
      println("  <- req resp")
      "OK"
    }
  }

  val stream = Stream.continually(createToken)
                     .map(checkExpiration)
                     .map(makeCall)
                     .take(2)
                     .force
}//end object ScalaStreamTest

A force call is necessary because a Stream is lazy and therefore all method calls before the force (namely: continually, map, & take) are also lazy. No computation would occur on the lazy Stream unless a reducer is called or the Stream is explicitly told to via force.

0
votes

Akka Streams always prefetch to keep the pipeline saturated.

To get what you want I suggest that you create a Source[Token] that emits new Tokens when the old one expires rather than on request. Then you zip your Source of data with the source of Tokens and use the result of that.