I have a server which upon receiving a request needs to use goroutines to read messages from different streams, send them to the parent goroutine and the parent goroutine aggregate the messages and send them to the client. So it will be like this:
Client --> API --> handler --> worker1
|--> worker2
|--> worker3
I use different channels for communicating between all these goroutines and workers can write to the aggregated channel and handler (parent goroutine) can receive it and send it back to the Client so everything works, ALMOST :)
The problem is my workers don't receive the message that was sent by the handler on the channel and the worker's goroutines never stop but I cannot find why they don't receive the messages.
Since the code is huge I just put the relevant parts of the code.
Note:
The loglistenmgm
which is the channel that I need to receive its message in workers has a buffer the same size as the number of workers BUT if I remove the buffer the info log after writing true
to it will never get printed (which means there is no goroutine listening) and I will only see this in the logs and logs hangs there:
INFO[0010] Request.Context().Done triggered
INFO[0010] **Sending true event to loglistenmgm: 0
My handler:
func handler() {
logStream := make(chan string)
done := make(chan bool, 5)
loglistenmgm := make(chan bool, numberOfPods)
errorStream := make(chan error)
for _, pod := range pods {
go getOnePodLogs(logStream, errorStream, loglistenmgm, done)
}
go func() {
for {
select {
case <-c.Request.Context().Done():
log.Info("Request.Context().Done triggered")
for i := 0; i < numberOfPods; i++ {
log.Info("**Sending true event to loglistenmgm: " + strconv.Itoa(i))
loglistenmgm <- true
log.Info("**After sending true event to loglistenmgm") // This gets printed so the true has been sent to the channel
}
done <- true
return
case err := <-errorStream:
c.JSON(http.StatusInternalServerError, map[string]string{
"message": "Internal Server Error: " + err.Error(),
})
for i := 0; i < numberOfPods; i++ {
loglistenmgm <- true
}
done <- true
return
}
}
}()
isStreaming := c.Stream(func(w io.Writer) bool {
for {
select {
case <-done:
c.SSEvent("end", "end") // This also works properly and it can print the "stream closed" which happens after this
return false
case msg := <-logStream:
c.Render(-1, sse.Event{
Event: "message",
Data: msg,
})
return true
}
}
})
if !isStreaming {
log.Info("stream closed")
}
}
My workers:
func getOnePodLogs(logStream chan string, errorStream chan error, loglistenmgm chan bool, done chan bool) {
stream, err := podLogRequest.Stream()
defer stream.Close()
if err != nil {
log.Error(err.Error())
errorStream <- err
return
}
for {
select {
case <-loglistenmgm:
log.Info(pod + "stop listenning to logs") // this log line never get triggered
return
default:
buf := make([]byte, 1000)
numBytes, err := stream.Read(buf)
if numBytes == 0 {
log.Info(pod + ": numBytes == 0 --> End of log")
done <- true
return
}
if err == io.EOF {
log.Info("io.EOF")
return
}
if err != nil {
log.Error("Error getting stream.Read(buf)")
log.Error(err)
return
}
message := string(buf[:numBytes])
logStream <- message // This works and I can receive the message on handler and it can pass it to the client
}
}
}
done
andloglistenmgm
, and prefer things likeWaitGroup
andContext
to control the goroutines. - JimBtrue
tologlistenmgm
but workers don't receive any message onloglistenmgm
. I will look into the Context. - AVarfstream.Read
, which means you will never be able to get out of thedefault
case to check for a message onloglistenmgm
. - JimBstream.Read
since the frequency of the messages in that stream is not that high. I tried thesync.WaitGroup
(I added all the required code) but it didn't work and at the end it just blocked the handler goroutine too. Is there any other way to kill these workers? - AVarfsync.WaitGroup
is to wait for goroutines to return, it's not going to fix your problem. How to stop the grouting is entirely dependent on whatpodLogRequest.Stream()
is. - JimB