I use StreamRefs to establish streaming connections between actors in the cluster. Currently, in the writing node, I save incoming messages to the log file manually, but I wonder is it possible to replace it with persistent Sink
for writing and persistent Source
for reading on actor startup
from the Akka Persistence journal. I've been thinking of replacing the log file sink with Persistent actor's persist { evt => ... }
, but since it executes asynchronously I'll lose the backpressure. So is it possible to write streaming data with backpressure into Akka Persistence journal and read this data in a streaming manner on actor recover?
Current implementation:
object Writer {
case class WriteSinkRequest(userId: String)
case class WriteSinkReady(userId: String, sinkRef: SinkRef[ByteString])
case class ReadSourceRequest(userId: String)
case class ReadSourceReady(userId: String, sourceRef: SourceRef[ByteString])
}
class Writer extends Actor {
// code omitted
val logsDir = "logs"
val path = Files.createDirectories(FileSystems.getDefault.getPath(logsDir))
def logFile(id: String) = {
path.resolve(id)
}
def logFileSink(logId: String): Sink[ByteString, Future[IOResult]] = FileIO.toPath(logFile(logId), Set(CREATE, WRITE, APPEND))
def logFileSource(logId: String): Source[ByteString, Future[IOResult]] = FileIO.fromPath(logFile(logId))
override def receive: Receive = {
case WriteSinkRequest(userId) =>
// obtain the source you want to offer:
val sink = logFileSink(userId)
// materialize the SinkRef (the remote is like a source of data for us):
val ref: Future[SinkRef[ByteString]] = StreamRefs.sinkRef[ByteString]().to(sink).run()
// wrap the SinkRef in some domain message, such that the sender knows what source it is
val reply: Future[WriteSinkReady] = ref.map(WriteSinkReady(userId, _))
// reply to sender
reply.pipeTo(sender())
case ReadSourceRequest(userId) =>
val source = logFileSource(userId)
val ref: Future[SourceRef[ByteString]] = source.runWith(StreamRefs.sourceRef())
val reply: Future[ReadSourceReady] = ref.map(ReadSourceReady(userId, _))
reply pipeTo sender()
}
}
P.S. Is it possible to create not a "save-to-journal" sink, but flow:
incoming data to write ~> save to persistence journal ~> data that was written
?