0
votes

When MyActor receive the Start message, it runs an Akka Stream, and publish each item received to the Akka Event Stream.

class MyActor (implicit system: ActorSystem, materialize: Materializer, ec: ExecutionContext) extends Actor {

  override def receive: Receive = {
    case Start =>
      someSource
        .toMat(Sink.foreach(item => system.eventStream.publish(item)))(Keep.left)
        .run()
  }
}

Now in another block of code I would like to build a Source from those items from that event stream, so every item published can be processed in another Akka Stream.

How can I do that ?

Just in case it might add more options, note that the other block of code in question is a Play framework 's Websocket handler.

2
Does this need to be placed into an actor (you can easily run system.eventStream.publish(item) outside of an actor)?Ivan Stanislavciuc
Yes I know, I just extracted that code from a bigger one. It has to do with someSource, the actor will clean resources on Shutdown ... But what about how to consume the Akka event stream in an Akka Stream Source ?acmoune
I mean is there something like Source.fromAkkaEvent[classOf[Item]] or can we build that ?acmoune
I am still struggling to try to build it using Source.queue but, the main issue is that only an Actor can subscribe to the ActorSytem's Event Stream. I really want to do with the Event Stream because I want to keep the publisher and subscriber decoupled. I think I need to find a way to integrate an Actor in the Stream processing.acmoune

2 Answers

1
votes

This seems like an XY problem. If the publisher and subscriber end up de-coupled, what should happen if the publisher produces data faster than the subscriber?

With that said, here's a way to do what you asked for:

/** Produce a source by subscribing to the Akka actorsystem event bus for a
  * specific event type.
  * 
  * @param bufferSize max number of events to buffer up in the source
  * @param overflowStrategy what to do if the event buffer fills up
  */
def itemSource[Item : ClassTag](
  bufferSize: Int = 1000,
  overflowStrategy: OverflowStrategy = OverflowStrategy.dropNew
)(
  implicit system: ActorSystem
): Source[Item, NotUsed] = Source
  .lazily { () =>
    val (actorRef, itemSource) = Source
      .actorRef[Item](
        completionMatcher = PartialFunction.empty,
        failureMatcher = PartialFunction.empty,
        bufferSize,
        overflowStrategy
      )
      .preMaterialize()

    system.eventStream.subscribe(actorRef, classTag[Item].runtimeClass)

    itemSource
  }
  .mapMaterializedValue(_ => NotUsed)

1
votes

I finally got it to work with BroadcastHub instead, no Akka Event Stream anymore.

My publisher (Which itself is consuming a Source) looks like this:

val publisherSource = someSource
  .toMat(BroadcastHub.sink(bufferSize = 256))(Keep.right)
  .run()

Then in another code block, I just need a reference to the publisherSource:

val subscriberSource = publisherSource
  .map(...) // Whatever

You can have as many subscriberSource as you want, they will all receive the same items.