21
votes

Update Coroutines 1.3.0-RC

Working version:

@FlowPreview
suspend fun streamTest(): Flow<String> = channelFlow {
    listener.onSomeResult { result ->
        if (!isClosedForSend) {
            offer(result)
        }
    }

    awaitClose {
        listener.unsubscribe()
    }
}

Also checkout this Medium article by Roman Elizarov: Callbacks and Kotlin Flows

Original Question

I have a Flow emitting multiple Strings:

@FlowPreview
suspend fun streamTest(): Flow<String> = flowViaChannel { channel ->
    listener.onSomeResult { result ->
            if (!channel.isClosedForSend) {
                channel.sendBlocking(result)
            }
    }
}

After some time I want to unsubscribe from the stream. Currently I do the following:

viewModelScope.launch {
    beaconService.streamTest().collect {
        Timber.i("stream value $it")
        if(it == "someString")
            // Here the coroutine gets canceled, but streamTest is still executed
            this.cancel() 
    }
}

If the coroutine gets canceled, the stream is still executed. There is just no subscriber listening to new values. How can I unsubscribe and stop the stream function?

4
I think this question is the same as stackoverflow.com/questions/59680533/… - ininmm
Thanks for the comment. It's not exactly the same. My question was how my flow emitter can detect if the flow is no longer needed and it can unsubscribe from a listener. - devz
There are extension functions which allow you to cancel the scope from within your launch {} block. You should now be able to safely call cancel() as you have it in your sample code if you use Kotlin 1.3.7+. See answer here: stackoverflow.com/a/65121663/1738090 - w3bshark

4 Answers

29
votes

A solution is not to cancel the flow, but the scope it's launched in.

val job = scope.launch { flow.collect { } }
job.cancel()
3
votes

With the current version of coroutines / Flows (1.2.x) I don't now a good solution. With onCompletion you will get informed when the flow stops, but you are then outside of the streamTest function and it will be hard to stop listening of new events.

beaconService.streamTest().onCompletion {

}.collect {
    ...
}

With the next version of coroutines (1.3.x) it will be really easy. The function flowViaChannel is deprecated in favor for channelFlow. This function allows you to wait for closing of the flow and do something in this moment, eg. remove listener:

channelFlow<String> {
    println("Subscribe to listener")

    awaitClose {
        println("Unsubscribe from listener")
    }
}
3
votes

You could use the takeWhile operator on Flow.

flow.takeWhile { it != "someString" }.collect { emittedValue ->
         //Do stuff until predicate is false  
       }
1
votes

When a flow runs in couroutin scope, you can get a job from it to controls stop subscribe.

// Make member variable if you want.
var jobForCancel : Job? = null

// Begin collecting
jobForCancel = viewModelScope.launch {
    beaconService.streamTest().collect {
        Timber.i("stream value $it")
        if(it == "someString")
            // Here the coroutine gets canceled, but streamTest is still executed
            // this.cancel() // Don't
    }
}

// Call whenever to canceled
jobForCancel?.cancel()