7
votes

I want to have a Source that evaluates a function at given intervals and emits its output. As a workaround, I can do it with a Source.queue + offer, but haven't found a cleaner way to do it. Ideally I would have something like

def myFunction() = ....                     // function with side-effects 
Source.tick(1.second, 1.second, myFunction) // myFunction is evaluated at every tick

Any ideas?

2

2 Answers

15
votes

Probably the cleanest way is to use map

Source.tick(1.second, 1.second, NotUsed).map(_ ⇒ myFunction())
1
votes

I guess, throttle is what you need. Fully runnable example with Source applied to iterable, which uses function in next():

import akka.actor.ActorSystem
import akka.stream.ActorMaterializer
import akka.stream.ThrottleMode.Shaping
import akka.stream.scaladsl.Source

import scala.concurrent.duration._

implicit val system = ActorSystem()
implicit val materializer = ActorMaterializer()
var i = 0

def myFunction(): Int = {
  i = i + 1
  i
}

import scala.collection.immutable.Iterable

val x: Iterable[Int] = new Iterable[Int] {
  override def iterator: Iterator[Int] =
    new Iterator[Int] {
      override def hasNext: Boolean = true

      override def next(): Int = myFunction()
    }
}
Source(x).throttle(1, 1.second, 1, Shaping).runForeach(println)

throttle parameters: throttle source with 1 element per 1 second with max burst = 1, with pauses before emitting messages to meet throttle rate (that what's Shaping for).