I am trying to play with actor builder construct in kotlin. i have written the code below to send and receive a message from actor.
package com.byteobject.prototype.kotlin
import kotlinx.coroutines.*
import kotlinx.coroutines.channels.actor
import kotlinx.coroutines.channels.consumeEach
class GreetingsMessage(val to: String, val greetings: CompletableDeferred<String>)
fun CoroutineScope.newGreeter(greet: String) = actor<GreetingsMessage> {
channel.consumeEach {
it.greetings.complete("$greet ${it.to}")
}
}
fun main() {
runBlocking {
val greeter = newGreeter("Hello")
val greetingsMessage = GreetingsMessage("World", CompletableDeferred())
launch(Dispatchers.Default) {
greeter.send(greetingsMessage)
}
launch(Dispatchers.Default) {
println(greetingsMessage.greetings.await())
greeter.close()
}
}
}
this code works as expected. but the code below is not, as it is hanging the program.
package com.byteobject.prototype.kotlin
import kotlinx.coroutines.*
import kotlinx.coroutines.channels.actor
import kotlinx.coroutines.channels.consumeEach
class GreetingsMessage(val to: String, val greetings: CompletableDeferred<String>)
suspend fun newGreeter(greet: String) = coroutineScope {
actor<GreetingsMessage> {
channel.consumeEach {
it.greetings.complete("$greet ${it.to}")
}
}
}
fun main() {
runBlocking {
val greeter = newGreeter("Hello")
val greetingsMessage = GreetingsMessage("World", CompletableDeferred())
launch(Dispatchers.Default) {
greeter.send(greetingsMessage)
}
launch(Dispatchers.Default) {
println(greetingsMessage.greetings.await())
greeter.close()
}
}
}
with the slight modification to the code by making newGreeter function as suspending function and enclosing the function by coroutineScope the call to newGreeter method is blocking the thread and its hanging the program. I believe newGreeter as an extension function to CoroutineScope and as a suspending function enclosed inside coroutineScope should work exactly the same.
I want to know the difference between the two approach and why the second approach is hanging the program.
I tried the same thing with produce function and here also i found the call to suspend function to get ReceieveChannel is blocking the thread, where as the same produce construct used as an extension function is working as expected
this code is non blocking
package com.byteobject.prototype.kotlin
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.channels.produce
import kotlinx.coroutines.coroutineScope
import kotlinx.coroutines.launch
import kotlinx.coroutines.runBlocking
fun CoroutineScope.produceIntegers(n: Int) = produce<Int> {
for (i in 1..n)
send(i)
close()
}
fun main() {
runBlocking {
val intChan = produceIntegers(10)
launch {
for (i in intChan)
println(i)
}
}
}
where as this is blocking on the call to produceIntegers method
package com.byteobject.prototype.kotlin
import kotlinx.coroutines.channels.produce
import kotlinx.coroutines.coroutineScope
import kotlinx.coroutines.launch
import kotlinx.coroutines.runBlocking
suspend fun produceIntegers(n: Int) = coroutineScope {
produce<Int> {
for (i in 1..n)
send(i)
close()
}
}
fun main() {
runBlocking {
val intChan = produceIntegers(10)
launch {
for (i in intChan)
println(i)
}
}
}