0
votes

I have a server which upon receiving a request needs to use goroutines to read messages from different streams, send them to the parent goroutine and the parent goroutine aggregate the messages and send them to the client. So it will be like this:

Client --> API --> handler --> worker1
                          |--> worker2
                          |--> worker3

I use different channels for communicating between all these goroutines and workers can write to the aggregated channel and handler (parent goroutine) can receive it and send it back to the Client so everything works, ALMOST :)

The problem is my workers don't receive the message that was sent by the handler on the channel and the worker's goroutines never stop but I cannot find why they don't receive the messages.

Since the code is huge I just put the relevant parts of the code.

Note: The loglistenmgm which is the channel that I need to receive its message in workers has a buffer the same size as the number of workers BUT if I remove the buffer the info log after writing true to it will never get printed (which means there is no goroutine listening) and I will only see this in the logs and logs hangs there:

INFO[0010] Request.Context().Done triggered             
INFO[0010] **Sending true event to loglistenmgm: 0

My handler:

func handler() {
        logStream := make(chan string)
        done := make(chan bool, 5)
        loglistenmgm := make(chan bool, numberOfPods)
        errorStream := make(chan error)

        for _, pod := range pods {
            go getOnePodLogs(logStream, errorStream, loglistenmgm, done)
        }

        go func() {
            for {
                select {
                case <-c.Request.Context().Done():
                    log.Info("Request.Context().Done triggered")
                    for i := 0; i < numberOfPods; i++ {
                        log.Info("**Sending true event to loglistenmgm: " + strconv.Itoa(i))
                        loglistenmgm <- true
                        log.Info("**After sending true event to loglistenmgm") // This gets printed so the true has been sent to the channel
                    }
                    done <- true
                    return
                case err := <-errorStream:
                    c.JSON(http.StatusInternalServerError, map[string]string{
                        "message": "Internal Server Error: " + err.Error(),
                    })
                    for i := 0; i < numberOfPods; i++ {
                        loglistenmgm <- true
                    }
                    done <- true
                    return
                }
            }
        }()

        isStreaming := c.Stream(func(w io.Writer) bool {
            for {
                select {
                case <-done:
                    c.SSEvent("end", "end") // This also works properly and it can print the "stream closed" which happens after this
                    return false
                case msg := <-logStream:
                    c.Render(-1, sse.Event{
                        Event: "message",
                        Data:  msg,
                    })
                    return true
                }
            }
        })
        if !isStreaming {
            log.Info("stream closed")
        }
}

My workers:

func getOnePodLogs(logStream chan string, errorStream chan error, loglistenmgm chan bool, done chan bool) {
    stream, err := podLogRequest.Stream()
    defer stream.Close()

    if err != nil {
        log.Error(err.Error())
        errorStream <- err
        return
    }

    for {
        select {
        case <-loglistenmgm:
            log.Info(pod + "stop listenning to logs") // this log line never get triggered
            return
        default:
            buf := make([]byte, 1000)
            numBytes, err := stream.Read(buf)
            if numBytes == 0 {
                log.Info(pod + ": numBytes == 0 --> End of log")
                done <- true
                return
            }
            if err == io.EOF {
                log.Info("io.EOF")
                return
            }
            if err != nil {
                log.Error("Error getting stream.Read(buf)")
                log.Error(err)
                return
            }
            message := string(buf[:numBytes])
            logStream <- message // This works and I can receive the message on handler and it can pass it to the client
        }
    }

}
1
It's hard to tell from the example here, but the first thing I would do is look at a stack trace when you are in the unexpected state and see where everything is blocked. Using a bunch of channels like this is always likely to lead to synchronization bugs between goroutines. I suggest you work on simplifying this design and avoid trying to "signal" between goroutines via channels like done and loglistenmgm, and prefer things like WaitGroup and Context to control the goroutines. - JimB
I know exactly where is the problem but I don't know why it happens and how to solve it. The problem is handler (parent goroutine) sends true to loglistenmgm but workers don't receive any message on loglistenmgm. I will look into the Context. - AVarf
We know the message is not lost because channels work, so then something must in a state you are not expecting, which is is the problem I'm trying to describe. The most likely candidate is that you are blocked in stream.Read, which means you will never be able to get out of the default case to check for a message on loglistenmgm. - JimB
Probably you are right regarding stream.Read since the frequency of the messages in that stream is not that high. I tried the sync.WaitGroup (I added all the required code) but it didn't work and at the end it just blocked the handler goroutine too. Is there any other way to kill these workers? - AVarf
sync.WaitGroup is to wait for goroutines to return, it's not going to fix your problem. How to stop the grouting is entirely dependent on what podLogRequest.Stream() is. - JimB

1 Answers

1
votes

Thanks to https://stackoverflow.com/users/32880/jimb which pointed out that the problem is stream.Read and that blocks the select I found the problem and I solved it.

The stream.Read is an io.ReadCloser with low frequency so when I send the true event to the loglistenmgm channel the worker is waiting to receive a new message on stream.Read and cannot read from loglistenmgm channel. I confirmed this by sending some message to the stream.Read minutes after the handler was closed and then the workers were able to read from the channel and exit.

I solved the problem by changing my program and instead of creating the stream in the workers, I created them in the handler and pass them to the workers and when I am done with the handler, I close all the streams and this triggers the numBytes == 0 and closes the worker.