I'm sorry to bear bad news, but it looks like this is the explicit design of akka
with respect to . You cannot reuse the instance of the flow for all clients as you want to. The fanout must be "explicit" as a consequence of the Rx model.
Examples I have come across use a routee-specific Flow
:
// The flow from beginning to end to be passed into handleWebsocketMessages
def websocketDispatchFlow(sender: String): Flow[Message, Message, Unit] =
Flow[Message]
// First we convert the TextMessage to a ReceivedMessage
.collect { case TextMessage.Strict(msg) => ReceivedMessage(sender, msg) }
// Then we send the message to the dispatch actor which fans it out
.via(dispatchActorFlow(sender))
// The message is converted back to a TextMessage for serialization across the socket
.map { case ReceivedMessage(from, msg) => TextMessage.Strict(s"$from: $msg") }
def route =
(get & path("chat") & parameter('name)) { name =>
handleWebsocketMessages(websocketDispatchFlow(sender = name))
}
Here is a discussion on it:
And this is exactly what I don't like in Akka Stream, this explicit
fan-out. When I receive a data source from somewhere that I want to
process (e.g. Observable or a Source), I just want to subscribe to it
and I don't want to care on whether it's cold or hot or whether it's
been subscribed by other subscribers or not. This is my river analogy.
The river should not care about who drinks from it and the drinkers
should not care about the river's source or about how many other
drinkers there are. My sample, that is equivalent to the one Mathias
provided, does share the data-source, but it simply does reference
counting and you can have 2 subscribers or you can have 100, doesn't
matter. And here I've gotten fancy, but reference counting doesn't
work if you don't want to lose events or if you want to ensure that
the stream remains always-on. But then you use ConnectableObservable
which has connect(): Cancelable
and that's a perfect fit for say ...
a Play's LifeCycle Plugin. And underlying that you can use a
BehaviorSubject or a ReplaySubject if you want to repeat previous
values for new subscribers. And things just work after that, no manual
drawing of that connections graph needed.
...
... (this is from https://bionicspirit.com/blog/2015/09/06/monifu-vs-akka-streams.html)
...
For functions that take an Observable and return an Observable, we
indeed have lift, which is the closest thing to something that has a
name and can be used to great effect in Monifu for Subject
or other
Observable types because of the LiftOperators1 (and 2), which is what
makes it possible to transform Observables without losing their type -
this is an OOP-ish improvement over what RxJava does with lift
.
But, such functions are not equivalent to Processor
/ Subject
. The
difference is that Subject
is at the same time a consumer and a
producer. This means that subscribers do not get to control exactly
when the data-source starts and that the data-source is in essence
hot (meaning that multiple subscribers share the same data-source). In Rx it's totally fine if you model cold observables (meaning
observables that start a new data-source per each individual
subscriber). On the other hand in Rx (in general) it's not OK to have
data sources that can be subscribed only once and then that's it. The
only exception to this rule in Monifu are the Observables produced by
the GroupBy operator, but that's like the exception that confirms the
rule.
What this means, especially coupled with another restriction of the
contract of both Monifu and the Reactive Streams protocol (thou shall
not subscribe multiple times with the same consumer), is that a
Subject
or a Processor
instance is not reusable. In order for such
an instance to be reusable, the Rx model would need a factory of
Processor
. Furthermore this means that whenever you want to use a
Subject
/ Processor
, your data source must automatically be hot
(shareable between multiple subscribers).