5
votes

I've read similar topics but couldn't find a proper answer:

In my Repository class I have a cold Flow that I want to share to 2 Presenters/ViewModels so my choice is to use shareIn operator.

Let's take a look on Android docs' example:

val latestNews: Flow<List<ArticleHeadline>> = flow {
    ...
}.shareIn(
    externalScope,  // e.g. CoroutineScope(Dispatchers.IO)?
    replay = 1,
    started = SharingStarted.WhileSubscribed()
)

What docs suggests for externalScope parameter:

A CoroutineScope that is used to share the flow. This scope should live longer than any consumer to keep the shared flow alive as long as needed.

However, looking for answer on how to stop subscribing a Flow, the most voted answer in 2nd link says:

A solution is not to cancel the flow, but the scope it's launched in.

For me, these answers are contradictory in SharedFlow's case. And unfortunately my Presenter/ViewModel still receives newest data even after its onCleared was called.

How to prevent that? This is an example how I consume this Flow in my Presenter/ViewModel:

fun doSomethingUseful(): Flow<OtherModel> {
    return repository.latestNews.map(OtherModel)

If this might help, I'm using MVI architecture so doSomethingUseful reacts to some intents created by the user.

3
A SharedFlow is a publisher, and subscribers that share this flow have independent scopes to that of the publisher. For this to be effective as a shared flow it should last longer than child subscribers that can cancel independently of this publisher without cancelling it, as it exists in a different scope. The docs do a good job of describing this. I suggest you setup some simple tests to test behaviour. In terms of RxJava this shared flow has similarities to a ConnectableObservableMark Keen
Okay, I understand that these scopes are independent of each other. I've also specified my question adding my use case of consuming flow. The problem is that this doSomethingUseful flow doesn't explicitly have a scope. The only scope I see is located in my BasePresenter/BaseViewModel class that subscribes to all intents (MVI-specific behaviour). Should I cancel it then?adek111

3 Answers

2
votes

I have tried to provide a minimal example with relevant comments. As stated SharedFlow works very similar to a ConnectableObservable in RxJava. Upstream will only be subscribed to once meaning computation is only done once with cold upstream flows. your repository does nothing as it a cold flow that is never "collected" until the SharedFlow subscribes so it doesn't have a scope.

Having used both RxJava and Flow there are so many similarities. It almost seemed unnecessary to create Flow and Collector interfaces and could have made it a lot easier on developers to transition if the base Reactive Streams interfaces were extended from - but I don't know the underlying reasons - maybe they wanted more flexibility with a new api, or stand out from just another Reactive Streams implementation like Java 9 implmentation and RxJava.

class MyViewModel : ViewModel(), CoroutineScope {

    override val coroutineContext: CoroutineContext = Dispatchers.Main.immediate + SupervisorJob() // optional + CoroutineExceptionHandler()

    private val latestNews: Flow<List<String>> = doSomethingUseful()
            .flowOn(Dispatchers.IO) // upstream will operate on this dispatch
            .shareIn(scope = this, // shared in this scope - becomes hot flow  (or use viewModelScope) for lifetime of your view model - will only connect to doSomethingUseful once for lifetime of scope
                     replay = 1,
                     started = SharingStarted.WhileSubscribed())


    fun connect() : Flow<List<String>> = latestNews // expose SharedFlow to "n" number of subscribers or same subscriber more than once

    override fun onCleared() {
        super.onCleared()
        cancel() // cancel the shared flow - this scope is finished
    }
}

class MainActivity : AppCompatActivity(), CoroutineScope {

    override val coroutineContext: CoroutineContext = Dispatchers.Main.immediate + SupervisorJob()

    private var job : Job? = null

    // supply the same view model instance on config changes for example - its scope is larger
    private val vm : MyViewModel by viewModels()

    override fun onCreate(savedInstanceState: Bundle?) {
        super.onCreate(savedInstanceState)
        setContentView(R.layout.activity_main)
    }

    override fun onStart() {
        super.onStart()

        job = launch {
            vm.connect().collect {
                // observe latest emission of hot flow and subsequent emissions if any - either reconnect or connect for first time
            }
        }
    }

    override fun onStop() {
        super.onStop()

        // cancel the job but latest news is still "alive" and receives emissions as it is running in a larger scope of this scope
        job?.cancel()
    }

    override fun onDestroy() {
        super.onDestroy()
        // completely cancel this scope - the ViewModel scope is unaffected
        cancel()
    }
}
2
votes

Thanks to Mark Keen's comments and post I think I managed to get a satisfactory result.

I've understand that scope defined in shareIn parameter doesn't have to be a same scope that my consumer operates. Changing scope in BasePresenter/BaseViewModel from CoroutineScope to viewModelScope seems to solve the main problem. You don't even need to manually cancel this scope, as defined in Android docs:

init {
    viewModelScope.launch {
        // Coroutine that will be canceled when the ViewModel is cleared.
    }
}

Just keep in mind that default viewModelScope dispatcher is Main which is not obvious and it might not be what you want! To change dispatcher, use viewModelScope.launch(YourDispatcher).

What is more, my hot SharedFlow is transformed from another cold Flow that is created on callbackFlow callback API (which is based on Channels API - this is complicated...)

After changing collection scope to viewModelScope, I was getting ChildCancelledException: Child of the scoped flow was cancelled exception when emitting new data from that API. This problem is well documented in both issues on GitHub:

As stated, there is a subtle difference between emission using offer and send:

offer is for non-suspending context, while send is for suspending ones.

offer is, unfortunately, non-symmetric to send in terms of propagated exceptions (CancellationException from send is usually ignored, while CancellationException from offer in nom-suspending context is not).

We hope to fix it in #974 either with offerOrClosed or changing offer semantics

As for Kotlin Coroutines of 1.4.2, #974 is not fixed yet - I hope it will in nearest future to avoid unexpected CancellationException.

Lastly, I recommend to play with started parameter in shareIn operator. After all these changes, I had to change from WhileSubscribed() to Lazily in my use case.

I will update this post if I will find any new information. Hopefully my research would save someone's time.

0
votes

Use SharedFlow. In the below example I am emitting value from one fragment and collecting it on another.

ViewModel:

class MenuOptionsViewModel : ViewModel() {
private val _option = MutableSharedFlow<String>()
val option = _option.asSharedFlow()

suspend fun setOption(o : String){
    _option.emit(o)
}
}

Fragment Emitting values:

class BottomSheetOptionsFragment  : BottomSheetDialogFragment() , KodeinAware{

    override fun onViewCreated(view: View, savedInstanceState: Bundle?) {
    super.onViewCreated(view, savedInstanceState)
    menuViewModel = activity?.run {
        ViewModelProviders.of(this).get(MenuOptionsViewModel::class.java)
    } ?: throw Exception("Invalid Activity")

    listViewOptions.adapter = ArrayAdapter<String>(
        requireContext(),
        R.layout.menu_text_item,
        options
    )

    listViewOptions.setOnItemClickListener { adapterView, view, i, l ->
        val entry: String = listViewOptions.getAdapter().getItem(i) as String

// here we are emitting values
        GlobalScope.launch { menuViewModel.setOption(entry) }
        Log.d(TAG, "emitting flow $entry")
        dismiss()
    }
}
}

Fragment Collecting values:

class DetailFragment : BaseFragment(), View.OnClickListener, KodeinAware,
OnItemClickListener {

override fun onViewCreated(view: View, savedInstanceState: Bundle?) {
        super.onViewCreated(view, savedInstanceState)
        menuViewModel = activity?.run {
            ViewModelProviders.of(this).get(MenuOptionsViewModel::class.java)
        } ?: throw Exception("Invalid Activity")


// collecting values
                lifecycleScope.launchWhenStarted {
            menuViewModel.option.collect {
                Log.d(TAG, "collecting flow $it")
                
            }
        }
}