1
votes

I need a way to signal from one main goroutine, an unknown number of other goroutines, multiple times. I also need for those other goroutines to select on multiple items, so busy waiting is (probably) not an option. I have come up with the following solution:

package main

import (
    "context"
    "fmt"
    "sync"
    "time"
)

type signal struct {
    data     []int
    channels []chan struct{}
}

func newSignal() *signal {
    s := &signal{
        data:     make([]int, 0),
        channels: make([]chan struct{}, 1),
    }
    s.channels[0] = make(chan struct{})
    return s
}

func (s *signal) Broadcast(d int) {
    s.data = append(s.data, d)
    s.channels = append(s.channels, make(chan struct{}))
    close(s.channels[len(s.data)-1])
}

func test(s *signal, wg *sync.WaitGroup, id int, ctx context.Context) {
    for i := 0; ; i += 1 {
        select {
        case <-s.channels[i]:
            if id >= s.data[i] {
                fmt.Println("Goroutine completed:", id)
                wg.Done()
                return
            }
        case <-ctx.Done():
            fmt.Println("Goroutine completed:", id)
            wg.Done()
            return
        }
    }
}

func main() {
    s := newSignal()

    ctx, cancel := context.WithCancel(context.Background())
    wg := sync.WaitGroup{}
    wg.Add(3)
    go test(s, &wg, 3, ctx)
    go test(s, &wg, 2, ctx)
    go test(s, &wg, 1, ctx)

    s.Broadcast(3)
    time.Sleep(1 * time.Second)

    // multiple broadcasts is mandatory
    s.Broadcast(2)
    time.Sleep(1 * time.Second)

    // last goroutine
    cancel()

    wg.Wait()
}

Playground: https://play.golang.org/p/dGmlkTuj7Ty

Is there a more elegant way to do this? One that uses builtin libraries only. If not, is this a safe/ok to use solution? I believe it is safe at least, as it works for a large number of goroutines (I have done some testing with it).

To be concise, here is exactly what I want:

  • The main goroutine (call it M) must be able to signal with some data (call it d) some unknown number of other goroutines (call them n for 0...n), multiple times, with each goroutine taking an action based on d each time
  • M must be able to signal all of the other n goroutines with certain (numerical) data, multiple times
  • Every goroutine in n will either terminate on its own (based on a context) or after doing some operation with d and deciding its fate. It will be performing this check as many times as signaled until it dies.
  • I am not allowed to keep track of the n goroutines in any way (eg. having a map of channels to goroutines and iterating)

In my solution, the slices of channels do not represent goroutines: they actually represent signals that are being broadcast out. This means that if I broadcast twice, and then a goroutine spins up, it will check both signals before sleeping in the select block.

1
This is not safe, at least in the sense that test function will wait until you close the 0th channel. It will only detect closing of channels in order. If you can describe what exactly you want to achieve, better ways of doing it can be suggested.Burak Serdar
I think what you have described is actually a feature that I need (that's why I implemented it in this somewhat bizarre way). I will add more details shortlyAsad-ullah Khan
If I am not mistaken, I believe that code is doing the opposite of what I am trying to do: it is listening to n goroutines from 1 goroutine. I am trying to broadcast from 1 goroutine to n goroutines. The solutions describe what is basically a channel multiplexer, which might also work (I will try it soon). Apologies in advance if my analysis is wrong, I am not super familiar with this stuff yet (I have also edited my answer with more details)Asad-ullah Khan
You are keeping track of all the signals sent now. You have to keep track of something.Burak Serdar

1 Answers

1
votes

It seems to me that you might want something like a fan-out pattern. Here's one source describing fan-in and fan-out, among other concurrency patterns. Here's a blog post on golang.org about this too. I think it's essentially a version of observer pattern using channels.

Basically, you want something, say Broadcaster, that keeps a list of channels. When you call Broadcaster.send(data), it loops over the list of channels sending data on each channel. Broadcaster must also have a way for goroutines to subscribe to Broadcaster. Goroutines must have a way to accept a channel from Broadcasteror give a channel to Broadcaster. That channel is the communication link.

If the work to be performed in the "observer" goroutines will take long, consider using buffered channels so that Broadcaster is not blocking during send and waiting on goroutines. If you don't care if a goroutine misses a data, you can use a non-blocking send (see below).

When a goroutine "dies", it can unsubscribe from Broadcaster which will remove the appropriate channel from its list. Or the channel can just remain full, and Broadcaster will have to use a non-blocking send to skip over full channels to dead goroutines.

I can't say what I described is comprehensive or 100% correct. It's just a quick description of the first general things I'd try based on your problem statement.