1
votes

I am trying to play with actor builder construct in kotlin. i have written the code below to send and receive a message from actor.

package com.byteobject.prototype.kotlin

import kotlinx.coroutines.*
import kotlinx.coroutines.channels.actor
import kotlinx.coroutines.channels.consumeEach

class GreetingsMessage(val to: String, val greetings: CompletableDeferred<String>)

fun CoroutineScope.newGreeter(greet: String) = actor<GreetingsMessage> {
        channel.consumeEach {
            it.greetings.complete("$greet ${it.to}")
        }
    }

fun main() {
    runBlocking {
        val greeter = newGreeter("Hello")
        val greetingsMessage = GreetingsMessage("World", CompletableDeferred())
        launch(Dispatchers.Default) {
            greeter.send(greetingsMessage)
        }
        launch(Dispatchers.Default) {
            println(greetingsMessage.greetings.await())
            greeter.close()
        }
    }
}

this code works as expected. but the code below is not, as it is hanging the program.

package com.byteobject.prototype.kotlin

import kotlinx.coroutines.*
import kotlinx.coroutines.channels.actor
import kotlinx.coroutines.channels.consumeEach

class GreetingsMessage(val to: String, val greetings: CompletableDeferred<String>)

suspend fun newGreeter(greet: String) = coroutineScope {
    actor<GreetingsMessage> {
        channel.consumeEach {
            it.greetings.complete("$greet ${it.to}")
        }
    }
}

fun main() {
    runBlocking {
        val greeter = newGreeter("Hello")
        val greetingsMessage = GreetingsMessage("World", CompletableDeferred())
        launch(Dispatchers.Default) {
            greeter.send(greetingsMessage)
        }
        launch(Dispatchers.Default) {
            println(greetingsMessage.greetings.await())
            greeter.close()
        }
    }
}

with the slight modification to the code by making newGreeter function as suspending function and enclosing the function by coroutineScope the call to newGreeter method is blocking the thread and its hanging the program. I believe newGreeter as an extension function to CoroutineScope and as a suspending function enclosed inside coroutineScope should work exactly the same.

I want to know the difference between the two approach and why the second approach is hanging the program.

I tried the same thing with produce function and here also i found the call to suspend function to get ReceieveChannel is blocking the thread, where as the same produce construct used as an extension function is working as expected

this code is non blocking

package com.byteobject.prototype.kotlin

import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.channels.produce
import kotlinx.coroutines.coroutineScope
import kotlinx.coroutines.launch
import kotlinx.coroutines.runBlocking

fun CoroutineScope.produceIntegers(n: Int) = produce<Int> {
        for (i in 1..n)
            send(i)
        close()
    }

fun main() {
    runBlocking {
        val intChan = produceIntegers(10)
        launch {
            for (i in intChan)
                println(i)
        }
    }
}

where as this is blocking on the call to produceIntegers method

package com.byteobject.prototype.kotlin

import kotlinx.coroutines.channels.produce
import kotlinx.coroutines.coroutineScope
import kotlinx.coroutines.launch
import kotlinx.coroutines.runBlocking

suspend fun produceIntegers(n: Int) = coroutineScope {
    produce<Int> {
        for (i in 1..n)
            send(i)
        close()
    }
}

fun main() {
    runBlocking {
        val intChan = produceIntegers(10)
        launch {
            for (i in intChan)
                println(i)
        }
    }
}
1

1 Answers

1
votes

The problem is that coroutineScope { } creates a new blocking scope (structured concurrency) - waits until all launched coroutines have completed.

See: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines/coroutine-scope.html

this function returns as soon as the given block and all its children coroutines are completed.

On the other side, the extension function just uses the coroutinescope that is in the context (receiver).