A global map
structure is needed, you can create a new chan
for each connection. What I come up with is an intermediate channel to deal with the global map
structure.
An example for server streaming:
func (s *server) Subscribe(req *pb.SubscribeRequest, srv pb.SubscribeServer) error {
//get trace id or generated a random string or whatever you want to indicate this goroutine
ID:="randomString"
//create a chan to receive response message
conn := make(chan *pb.SubscribeResponse)
//an intermediate channel which has the ownership of the `map`
s.broadcast <- &broadcastPayload {
//an unique identifier
ID: ID
//the chan corresponse to the ID
Conn: conn
//event to indicate add, remove or send message to broadcast channel
Event: EventEnum.AddConnection
}
for {
select {
case <-srv.Context().Done():
s.broadcast <- &entity.BroadcastPayload{
ID: ID,
Event: EventEnum.RemoveConnection
}
return nil
case response := <-conn:
if status, ok := status.FromError(srv.Send(response)); ok {
switch status.Code() {
case codes.OK:
//noop
case codes.Unavailable, codes.Canceled, codes.DeadlineExceeded:
return nil
default:
return nil
}
}}
}
}
For the broadcast
go routine:
//this goroutine has the ownership of the map[string]chan *pb.SubscribeResponse
go func(){
for v:=range s.broadcast {
//do something based on the event
switch v.Event {
//add the ID and conn to the map
case EventEnum.AddConnection:
...
//delete map key and close conn channel here
case EventEnum.RemoveConnection:
...
//receive message from business logic, send the message to suiteable conn in the map as you like
case EventEnum.ReceiveResponse:
...
}
}
}
I put some details here