1
votes

In the following code I am trying to spawn MaxOutstanding number of handlers. Each handler loops over items in the queue queue and prints them out, I also write true to the done channel.

In my main function, I start the handlers and write 9 elements to the queue and wait for the 1st element to be written to the done queue.

package main


import "fmt"

type Request struct {
        int32
}
var MaxOutstanding = 5
func handle(queue chan *Request, i int, done chan bool) {
    for r := range queue {
        fmt.Println(i, "---", r)
        done <- true
    }
}

func Serve(clientRequests chan *Request, quit, done chan bool) {
    // Start handlers
    for i := 0; i < MaxOutstanding; i++ {
        go handle(clientRequests, i, done)
    }
    <-quit  // Wait to be told to exit.
}


func main() {
    clientRequests := make(chan *Request)
    quit := make(chan bool)
    done := make(chan bool)

    go Serve(clientRequests, quit, done)

    clientRequests <- &Request{4}
    clientRequests <- &Request{1}
    clientRequests <- &Request{2}
    clientRequests <- &Request{3}

    clientRequests <- &Request{5}
    clientRequests <- &Request{6}
    clientRequests <- &Request{7}
    clientRequests <- &Request{8}
    clientRequests <- &Request{9}
    fmt.Println( "...........>", <- done )
    close(clientRequests)
    close(done)
}

On execution I get the following error. I don't see whats wrong with the implementation, I am even closing the channel.

4 --- &{4}
0 --- &{1}
1 --- &{2}
2 --- &{3}
3 --- &{5}
fatal error: all goroutines are asleep - deadlock!

goroutine 1 [chan send]:
main.main()
        /home/ubuntu/digs-svc/src/digs/go1.go:45 +0x251

goroutine 5 [chan receive]:
main.Serve(0xc82004c060, 0xc82004c0c0, 0xc82004c120)
        /home/ubuntu/digs-svc/src/digs/go1.go:28 +0x92
created by main.main
        /home/ubuntu/digs-svc/src/digs/go1.go:37 +0xb9

goroutine 6 [chan send]:
main.handle(0xc82004c060, 0x0, 0xc82004c120)
        /home/ubuntu/digs-svc/src/digs/go1.go:16 +0x23b
created by main.Serve
        /home/ubuntu/digs-svc/src/digs/go1.go:25 +0x5b

EDIT:

Apparently, the fmt.Println("....", <- done) wasn't enough to signify that there is a consumer to the done channel. I moved the code up in the execution order. A consumer needs to be "listening" on the channel when data is written to it. In my earlier code, the when the first data was written there were no consumer.

Working code.

https://play.golang.org/p/98l2M4XO9t

2
At first sight you write to done at least 5 times, but read from it only once.myaut
@myaut , That is the intention. I am only interested in the first instance of data being written to done.Pratyush

2 Answers

1
votes

You're blocking the iteration over the channel in your handle function with the send on the done channel, because nothing is receiving on the other side.

Those extra channels aren't really doing anything, and you could just add a WaitGroup to synchronize the handler's exit, then you can remove the done channel which will allow the handler to continue.

func handle(queue chan *Request, i int, wg *sync.WaitGroup) {
    defer wg.Done()

    for r := range queue {
        fmt.Println(i, "---", r)
    }
}

func Serve(clientRequests chan *Request, wg *sync.WaitGroup) {
    // Start handlers
    for i := 0; i < MaxOutstanding; i++ {
        wg.Add(1)
        go handle(clientRequests, i, wg)

    }
}

func main() {
    clientRequests := make(chan *Request)
    var wg sync.WaitGroup

    go Serve(clientRequests, &wg)

    for i := int32(0); i < 50; i++ {
        clientRequests <- &Request{i}
    }

    close(clientRequests)
    wg.Wait()
}

https://play.golang.org/p/oUFjZONjhk (note that in the playground, this example seems to currently favor a single goroutine being the receiver. Normally the blocked goroutines will receive randomly, and you can see that behavior if you compile and run normally)

1
votes

Inside the for loop you are handling the channel operation only to the 5th element, however in the main function you are trying to send over the value to the channel, which is closed.

To overcome this situation you can send the request value inside a for loop:

for i := 0; i < MaxOutstanding; i++ {
        clientRequests <- &Request{int32(i)}
}

Here is the working code:

package main

import (
    "fmt"
)

type Request struct {
    int32
}

var MaxOutstanding = 10

func handle(queue chan *Request, i int, done chan bool) {
    for r := range queue {
        fmt.Println(i, "---", r)
        done <- true
    }
}

func Serve(clientRequests chan *Request, quit, done chan bool) {
    // Start handlers
    for i := 0; i < MaxOutstanding; i++ {
        go handle(clientRequests, i, done)
    }
    <-quit // Wait to be told to exit.
}

func main() {
    clientRequests := make(chan *Request)
    quit := make(chan bool)
    done := make(chan bool)

    go Serve(clientRequests, quit, done)
    for i := 0; i < MaxOutstanding; i++ {
        clientRequests <- &Request{int32(i)}
    }

    fmt.Println("...........>", <-done)
    close(clientRequests)
    close(done)
}

https://play.golang.org/p/L5Y2YoFNvz