2
votes

I'm not able to close channel when there is no knowledge about its
length

package main

import (
    "fmt"
    "time"
)

func gen(ch chan int) {
    var i int
    for {
        time.Sleep(time.Millisecond * 10)
        ch <- i
        i++
        // when no more data (e.g. from db, or event stream)
        if i > 100 {
            break
        }
    }

    // hot to close it properly?
    close(ch)
}

func receiver(ch chan int) {
    for i := range ch {
        fmt.Println("received:", i)
    }
}

func main() {
    ch := make(chan int)

    for i := 0; i < 10; i++ {
        go gen(ch)
    }

    receiver(ch)
}

It gives me error

panic: send on closed channel

goroutine 8 [running]:
main.gen(0xc82001a0c0)
    /home/exu/src/github.com/exu/go-workshops/100-concurrency-channels/16-close-problem.go:12 +0x57
created by main.main
    /home/exu/src/github.com/exu/go-workshops/100-concurrency-channels/16-close-problem.go:35 +0xbd

goroutine 1 [panicwait]:
runtime.gopark(0x0, 0x0, 0x50b8e0, 0x9, 0x10, 0x1)
    /usr/lib/go/src/runtime/proc.go:185 +0x163
runtime.main()
    /usr/lib/go/src/runtime/proc.go:121 +0x2f4
runtime.goexit()
    /usr/lib/go/src/runtime/asm_amd64.s:1696 +0x1

goroutine 6 [sleep]:
time.Sleep(0x989680)
    /usr/lib/go/src/runtime/time.go:59 +0xf9
main.gen(0xc82001a0c0)
    /home/exu/src/github.com/exu/go-workshops/100-concurrency-channels/16-close-problem.go:11 +0x29
created by main.main
    /home/exu/src/github.com/exu/go-workshops/100-concurrency-channels/16-close-problem.go:33 +0x79

goroutine 7 [sleep]:
time.Sleep(0x989680)
    /usr/lib/go/src/runtime/time.go:59 +0xf9
main.gen(0xc82001a0c0)
    /home/exu/src/github.com/exu/go-workshops/100-concurrency-channels/16-close-problem.go:11 +0x29
created by main.main
    /home/exu/src/github.com/exu/go-workshops/100-concurrency-channels/16-close-problem.go:34 +0x9b
exit status 2

It's logical - first goroutine closing channel when the second one tries to send to it. What will be the best approach to close channel in this situation?

2

2 Answers

18
votes

Once a channel is closed, you can't send further values on it else it panics. This is what you experience.

This is because you start multiple goroutines that use the same channel and they send values on it. And you close the channel in each of it. And since they are not synchronized, once the first goroutine reaches the point where it closes it, others may (and they will) still continue to send values on it: panic!

You can close the channel only once (attempting to close an already closed channel also panics). And you should do it when all the goroutines that send values on it are done. In order to do this, you need to detect when all the sender goroutines are done. An idiomatic way to detect this is to use sync.WaitGroup.

For each started sender goroutine we add 1 to the WaitGroup using WaitGroup.Add(). And each goroutine that is done sending the values can signal this by calling WaitGroup.Done(). Best to do this as a deferred statement, so if your goroutine would terminate abruptly (e.g. panics), WaitGroup.Done() would still be called, and would not leave other goroutines hanging (waiting for an absolution - a "missing" WaitGroup.Done() call that would never come...).

And WaitGroup.Wait() will wait until all sender goroutines are done, and only after this and only once will it close the channel. We want to detect this "global" done event and close the channel while processing the values sent on it is in progress, so we have to do this in its own goroutine.

The receiver goroutine will run until the channel is closed since we used the for ... range construct on the channel. And since it runs in the main goroutine, the program will not exit until all the values are properly received and processed from the channel. The for ... range construct loops until all the values are received that were sent before the channel was closed.

Note that the solution below works with buffered and unbuffered channel too without modification (try using a buffered channel with ch := make(chan int, 100)).

Correct solution (try it on the Go Playground):

func gen(ch chan int, wg *sync.WaitGroup) {
    defer wg.Done()
    var i int
    for {
        time.Sleep(time.Millisecond * 10)
        ch <- i
        i++
        // when no more data (e.g. from db, or event stream)
        if i > 100 {
            break
        }
    }
}

func receiver(ch chan int) {
    for i := range ch {
        fmt.Println("received:", i)
    }
}

func main() {
    ch := make(chan int)
    wg := &sync.WaitGroup{}

    for i := 0; i < 10; i++ {
        wg.Add(1)
        go gen(ch, wg)
    }

    go func() {
        wg.Wait()
        close(ch)
    }()

    receiver(ch)
}

Note:

Note that it's important that receiver(ch) runs in the main goroutine, and the code what waits for the WaitGroup and closes the channel in its own (non-main) goroutine; and not the other way around. If you would switch these 2, it might cause an "early exit", that is not all values might be received and processed from the channel. The reason for this is because a Go program exits when the main goroutine finishes (spec: Program execution). It does not wait for other (non-main) goroutines to finish. So if waiting and closing the channel would be in the main goroutine, after closing the channel the program could exit at any moment, not waiting for the other goroutine that in this case would loop to receive values from the channel.

0
votes

"One general principle of using Go channels is don't close a channel from the receiver side and don't close a channel if the channel has multiple concurrent senders."

Every channel will be GCed eventually once it is marked for cleanup, so it is okay to leave channel un-closed the only difference it will make is that that channel will be available for gc after a few cycles maybe if not closed explicitly.

Nevertheless it is always good if you can close the channel off. Please go through the following links for detailed explaination.

Articles this and this shows various ways to close a channel in case of 1:N, N:1 or M:N (senders:receivers)