11
votes

I'm struggling to create a 'takeUntilSignal' operator for a Flow - an extension method that will cancel a flow when another flow generates an output.

fun <T> Flow<T>.takeUntilSignal(signal: Flow<Unit>): Flow<T>

My initial effort was to try to launch collection of the signal flow in the same coroutine scope as the primary flow collection, and cancel the coroutine scope:

fun <T> Flow<T>.takeUntilSignal(signal: Flow<Unit>): Flow<T> = flow {
    kotlinx.coroutines.withContext(coroutineContext) {
        launch {
            signal.take(1).collect()
            println("signalled")
            cancel()
        }
        collect {
            emit(it)
        }
    }
}

But this isn't working (and uses the forbidden "withContext" method that is expressly stubbed out by Flow to prevent usage).

edit I've kludged together the following abomination, which doesn't quite fit the definition (resulting flow will only cancel after first emission from primary flow), and I get the feeling there's a far better way out there:

fun <T> Flow<T>.takeUntilSignal(signal: Flow<Unit>): Flow<T> =
    combine(
        signal.map { it as Any? }.onStart { emit(null) }
    ) { x, y -> x to y }
        .takeWhile { it.second == null }
        .map { it.first }

edit2 another try, using channelFlow:

fun <T> Flow<T>.takeUntilSignal(signal: Flow<Unit>): Flow<T> =
    channelFlow {
        launch {
            signal.take(1).collect()
            println("hello!")
            close()
        }
        collect { send(it) }
        close()
    }
1
Why does the signal have to be a Flow? If it was a Deferred (a natural choice for the single-element case), you'd have the isCompleted flag to check in each iteration of collect.Marko Topolnik
Hi Marko - that's an interesting idea! My specific use case is that I have a dialog box - fields in the dialog box bind to flows, and I want to cancel these flows when the dialog box closes. These flows won't emit when the dialog box closes, leaving these collections in a kind of limbo state. I can store the jobs for each collect and cancel all of them when the closed signal is received, but that's a lot of book-keeping.Andy
Maybe you misunderstood me, I didn't imply an async job but a plain CompletableDeferred which you explicitly complete with a value. Closing the dialog box would complete it and all the field-flows would check it and close themselves when it completes.Marko Topolnik
Hi Marko - if I undertand right it would be something like collect { if (signal.isCompleted) throw CancellationException() else emit(it) }Andy
Yes, for example first() works the same way, by throwing an exception: try { collect { value -> result = value; throw AbortFlowException() } } catch (e: AbortFlowException) { // Do nothing } Marko Topolnik

1 Answers

7
votes

Use couroutineScope and start the new coroutine inside:

fun <T> Flow<T>.takeUntilSignal(signal: Flow<Unit>): Flow<T> = flow {
    try {
        coroutineScope {
            launch {
                signal.take(1).collect()
                println("signalled")
                this@coroutineScope.cancel()
            }

            collect {
                emit(it)
            }
        }

    } catch (e: CancellationException) {
        //ignore
    }
}