2
votes

I've been trying to solve this simple problem I encountered in Golang concurrency. I've been searching all possible solutions, but found nothing specific to my problem(or I might be missed one). Here's my code:

package main

import (
    "fmt"
    "time"
)

func producer(ch chan int, d time.Duration, num int) {

    for i:=0; i<num; i++ {
        ch <- i
        time.Sleep(d)
    }
}

func main() {
    ch := make(chan int)

    go producer(ch, 100*time.Millisecond, 2)
    go producer(ch, 200*time.Millisecond, 5)

    for {
        fmt.Println(<-ch)    
    }

    close(ch)
}

It prints error:

fatal error: all goroutines are asleep - deadlock!

goroutine 1 [chan receive]: main.main() D:/Code/go/src/testconcurrency/main.go:23 +0xca exit status 2

What is the efficient way to avoid this error?, Thank you.

5
you can go through this questionHimanshu
Your problem is that you "post" integers in the channel, then you are trying to read them from the main thread. Then the stream of integers stops, when the goroutines end. (when both producers end, no more integers are posted into the channel) and you still wait on the main thread for anymore integer to be posted, which will never happen. Try closing the channels using close(ch) after they are no more used.user9335240
I did the close(ch)Jake Muller
You closed it after the place that makes a deadlock. You "in the infinite for loop", are keeping polling the channel, waiting for any integer posted to it. After the both producers end, you are still waiting for more, while nothing is produced. The close(ch) must be in the end of the producer (but in your case, it will be difficult because you have 2 producers).user9335240
Try at first, to delete one producer, then run the code, the same crash will happen. Then try to move close(ch) to the end of the producer routine, will succeed. But in the 2 producers case, you may need another channel or something, you need a better design.user9335240

5 Answers

6
votes

You have producers which are "short-lived", they only send values on the channel for a finite amount of time, and you have an endless for loop which receives values from the channel endlessly, without a termination condition, and the channel is only closed after this endless loop. Once the producers stop sending values, it's a deadlock.

Channels must be closed by the producer(s), signalling that no more values will be sent on it. Since you have multiple producers without synchronization (producers are not synchronized with each other), in general you can't tell which one will finish first, so you can't designate one to close the channel (and a channel can only be closed once, see Why Go's channel can close twice?; and Closing channel of unknown length).

You have to "coordinate" the producers, and when all have finished their jobs, the coordinator should close the channel.

And the consumer should use a for range on the channel, as the for range construct receives all values from the channel that were sent on it before it was closed, then it terminates automatically.

For the coordination it is recommended to use sync.WaitGroup. Whether you use a global one in this case or a local one and you pass it to producers is up to you. Using a local will make the solution more general and easier to extend. One thing to note is that you must pass a pointer to sync.WaitGroup. Whenever you spin up a new producer, increment the waitgroup using WaitGroup.Add(). When a producer is done, it can signal this using WaitGroup.Done(), preferably using defer (so it runs no matter what, mitigating the deadlock in case of abnormal circumstances). And the controller can wait for all producers to finish using WaitGroup.Wait().

Here's a complete solution:

func producer(ch chan int, d time.Duration, num int, wg *sync.WaitGroup) {
    defer wg.Done()

    for i := 0; i < num; i++ {
        ch <- i
        time.Sleep(d)
    }
}

func main() {
    wg := &sync.WaitGroup{}
    ch := make(chan int)

    wg.Add(1)
    go producer(ch, 100*time.Millisecond, 2, wg)
    wg.Add(1)
    go producer(ch, 200*time.Millisecond, 5, wg)

    go func() {
        wg.Wait()
        close(ch)
    }()

    for v := range ch {
        fmt.Println(v)
    }
}

Output (try it on the Go Playground):

0
0
1
1
2
3
4

See related question: Prevent the main() function from terminating before goroutines finish in Golang

5
votes

This problem can be solved in an elegant way using two wait groups. By closing channel ch we signal to the consumers that there is no more data.

The solutions scales well with more consumers.

package main

import (
    "fmt"
    "sync"
    "time"
)

func producer(ch chan<- int, d time.Duration, num int, wg *sync.WaitGroup) {
    defer wg.Done()
    for i := 0; i < num; i++ {
        ch <- i
        time.Sleep(d)
    }
}

func consumer(ch <-chan int, wg *sync.WaitGroup) {
    defer wg.Done()
    for x := range ch {
        fmt.Println(x)
    }
}

func main() {
    ch := make(chan int)
    producers := &sync.WaitGroup{}
    consumers := &sync.WaitGroup{}

    producers.Add(2)
    go producer(ch, 100*time.Millisecond, 2, producers)
    go producer(ch, 200*time.Millisecond, 5, producers)

    consumers.Add(1)
    go consumer(ch, consumers)

    producers.Wait()
    close(ch)
    consumers.Wait()
}
3
votes

The problem is that <-ch is blocking, so if you don't add any new values to the channel it will block forever. One way is to replace it with a switch select which is also blocking but allows to listen on multiple channels. You would also have to add an exit channel. In your example, as soon as the exit channel received two values we can break. The break statement needs a label because we wanna exit from the switch and the for loop.

https://play.golang.org/p/wGdCulZDnrx

Another way is to have multiple input channels and close them as soon as they are finished sending. For this, each goroutine needs it's own channel, otherwise we will exit when the first goroutine is finished.

A third option is to create a merge function which merges multiple channels into one. This allows for moving the creation of the channels into the producers, so they are created, filled and closed in one location. The merge function is relatively complex but it's removed from the business logic code and can separately be understood and tested. the main code is then reduced to just:

ch1 := producer(100*time.Millisecond, 2)
ch2 := producer(200*time.Millisecond, 5)

for i := range merge(ch1, ch2) {
    fmt.Println(i)
}

https://play.golang.org/p/2mv8ILhJPIB

merge func is from https://blog.golang.org/pipelines

2
votes

You need to synchronize all the asynchronous process in your goroutines. Your main thread and the goroutine threads are not synchronous process. Your main thread will never knew when to stop invoking channel from goroutines. Since your main thread loop over the channel, it always invoke the value from channel, and when the goroutines finished and the channel stop sending value, your main thread cannot get anymore value from the channel, hence the condition become deadlock. To avoid this use sync.WaitGroup to synchronize the asynchronous process.

Here's the code:

package main

import (
    "fmt"
    "time"
    "sync"
)

func producer(ch chan int, d time.Duration, num int, wg *sync.WaitGroup) {
    for i:=0; i<num; i++ {
        ch <- i;
        time.Sleep(d);
    }
    defer wg.Done();
}

func main() {
    wg  := &sync.WaitGroup{}
    ch  := make(chan int);

    wg.Add(2);
    go producer(ch, 100*time.Millisecond, 2, wg);
    go producer(ch, 200*time.Millisecond, 5, wg);

    go func() {   
    wg.Wait()
    close(ch)
    }()

    // print the outputs
    for i:= range ch {
        fmt.Println(i);
    }
}

https://play.golang.org/p/euMTGTIs83g

Hope it helps.

Since my solution looks a little similar to already answered, I change it to my original answer before modification to suit OP question.

Here's the code:

package main

import (
    "fmt"
    "time"
    "sync"
)

// producer produce values tobe sent to consumer
func producer(ch chan int, d time.Duration, num int, wg *sync.WaitGroup) {
    defer wg.Done();
    for i:=0; i<num; i++ {
        ch <- i;
        time.Sleep(d);
    }
}

// consumer consume all values from producers
func consumer(ch chan int, out chan int, wg *sync.WaitGroup) {
    defer wg.Done();
    for i:= range ch {
        out <- i
    }
}

// synchronizer synchronize all goroutines to avoid deadlocks
func synchronizer(ch chan int, out chan int, wgp *sync.WaitGroup, wgc *sync.WaitGroup) {
    wgp.Wait()
    close(ch)
    wgc.Wait()
    close(out)
}

func main() {
    wgp  := &sync.WaitGroup{}
    wgc  := &sync.WaitGroup{}
    ch  := make(chan int);
    out := make(chan int);

    wgp.Add(2);
    go producer(ch, 100*time.Millisecond, 2, wgp);
    go producer(ch, 200*time.Millisecond, 5, wgp);

    wgc.Add(1);
    go consumer(ch, out, wgc)

    go synchronizer(ch, out, wgp, wgc)

    // print the outputs
    for i:= range out {
        fmt.Println(i);
    }
}

Using consumer goroutine to fan-in all input from multiple goroutines and read all values from the consumer goroutine.

Hope it helps.

0
votes

Simpler answer- one of the producers needs to close the channel, and the consumer can just range over the channel.

package main

import (
    "fmt"
    "time"
)

func producer(ch chan int, d time.Duration, num int, closer bool) {

    for i:=0; i<num; i++ {
        ch <- i
        time.Sleep(d)   
    }
    if closer {
        close(ch)
    }
}

func main() {
    ch := make(chan int)

    go producer(ch, 100*time.Millisecond, 2, false)
    go producer(ch, 200*time.Millisecond, 5, true)

    for i := range ch {
        fmt.Println(i)
    }

}

Of course, unless you have a situation where you know which producer will always finish last, you would not want to do this in real code. Better designs are in the WaitGroup-based patterns in the other answers. But this is the simplest way for this code to avoid deadlock.