0
votes

I am starting with GRPC using go. I read the official docs, and few examples.

In most examples, you don't identify the client, but use the stream to read / write data. I see there is API in Context to retrieve auth info, and can indentify the client for the ChatRequest. But, what if I want to keep a reference / index to the Stream based on the client id.

For example,

say I have 3 users in a chat room. I represent the rpc as (it could be server-streaming as well)

rpc Chat(stream ChatRequest) returns (stream ChatResponse) {}

say, one user send a message to the group, which need to send to other 2. So, if I need to send it through the Stream currently opened for those users, how safe is to keep a reference of the stream.

The implementation will be like...

type chatServiceServer struct {
    // keep a map of subscribers / users currently connected; protect with mutex
}

func (s *chatServiceServer) Chat(stream pb.ChatService_ChatServer) error {
    // md, ok := metadata.FromIncomingContext(stream.Context())
    // p, ok := peer.FromContext(ctx)
    // ... identify client from above

    for {
        // save the message to DB
        // find other users in the chatroom is currently connected
        // if so, stream.Send(m)
        // else notify ....
    }
}

But, I see the warnings in API doc and wonder a better way.

https://godoc.org/google.golang.org/grpc#ServerStream
// It is safe to have a goroutine calling SendMsg and another goroutine
// calling RecvMsg on the same stream at the same time, but it is not safe
// to call SendMsg on the same stream in different goroutines.
SendMsg(m interface{}) error

Similar use case arise with any subscription (event, ....), and need to notify based on client id. Any example code, article would be great as well.

Thank you

1

1 Answers

1
votes

It should be safe to save the stream for use elsewhere, but you may only call SendMsg in a single goroutine (the same limitation is true for RecvMsg, independently). So this means if you do this in your method handler:

for {
  if err := stream.Recv(req); err != nil {
    return err
  }
  for _, s := range allStreams[req.ID] {
    if err := s.Send(req.Message); err != nil { /* remove s from allStreams */ }
  }
}

Then the call to s.Send must be guarded by a lock, because multiple of these handlers may be running concurrently. (Also allStreams is assumed to be a map[ID][]stream, in which case it must also be guarded by a lock.)