13
votes

SharedFlow has just been introduced in coroutines 1.4.0-M1, and it is meant to replace all BroadcastChannel implementations (as stated in the design issue decription).

I have a use case where I use a BroadcastChannel to represent incoming web socket frames, so that multiple listeners can "subscribe" to the frames. The problem I have when I move to a SharedFlow is that I can't "end" the flow when I receive a close frame, or an upstream error (which I would like to do to inform all subscribers that the flow is over).

How can I make all subscriptions terminate when I want to effectively "close" the SharedFlow? Is there a way to tell the difference between normal closure and closure with exception? (like channels)

If MutableSharedFlow doesn't allow to convey the end of the flow to subscribers, what is the alternative if BroadcastChannel gets deprecated/removed?

2

2 Answers

7
votes

The SharedFlow documentation describes what you need:

Note that most terminal operators like Flow.toList would also not complete, when applied to a shared flow, but flow-truncating operators like Flow.take and Flow.takeWhile can be used on a shared flow to turn it into a completing one.

SharedFlow cannot be closed like BroadcastChannel and can never represent a failure. All errors and completion signals should be explicitly materialized if needed.

Basically you will need to introduce a special object that you can emit from the shared flow to indicate that the flow has ended, using takeWhile at the consumer end can make them emit until that special object is received.

-1
votes

I think a possible solution is creating a boolean flag isValid and publicly expose only flows with .takeWhile { isValid }. Then just call isValid = false and sFlow.emit() when you want to close all subscribers.

Possible implementation:

private var isValid = true // In real scenario use atomic boolean
private val _sharedFlow = MutableSharedFlow<Unit>()
val sharedFlow: Flow<Unit> get() = _sharedFlow.takeWhile { isValid }

suspend fun cancelSharedFlow() {
    isValid = false
    _sharedFlow.emit(Unit)
}

EDIT: In my case .emit() was always suspending so I had to use BufferOverflow.DROP_LATEST (which is not suitable for many usecases). Not sure if the problem is in this example or elsewhere in my app. If you see a problem, please comment :)