I'm trying to create a Source that provides OAuth2 tokens and that also takes care of refreshing expired tokens. Currently my code looks a bit like this
case class Token(expires: Instant = Instant.now().plus(100, ChronoUnit.MILLIS)){
def expired = Instant.now().isAfter(expires)
}
Source
.repeat()
.mapAsync(1){ _ =>
println(" -> token req")
// this fakes an async token request to the token service
Future{
Thread.sleep(500)
println(" <- token resp")
Token()
}
}
.mapAsync(1){ token =>
println(" -> req with token auth")
if(token.expired){
println("!!! Received expired token")
}
// this is the actual call that needs the token
println("making call")
Future{
Thread.sleep(2000)
println(" <- req resp")
"OK"
}
}
.take(2)
.runWith(Sink.ignore)
.recover{case e => ()}
.flatMap{ _ =>
system.terminate()
}
Output of this code looks like this
root -> token req
root <- token resp
root -> token req
root -> req with token auth
root making call
root <- token resp
root -> token req
root <- token resp
root -> token req
root <- token resp
root -> token req
root <- req resp
root -> req with token auth
root !!! Received expired token
root making call
root <- token resp
root -> token req
root <- token resp
root -> token req
root <- token resp
root <- req resp
root -> req with token auth
root !!! Received expired token
root making call
root ... finished with exit code 0
Clearly this mapAsync(1) is producing demand when not expected (prefetching?)
There are 2 issues:
- demand causes unneeded token requests upstream
- the pre-fetching/caching of the tokens is problematic as they are only valid for a specific amount of time
So how do I create a true pull stream that behaves like this function?
def tokenSource: () => Future[Token]