1
votes

I am writing a server handle data stream on golang using grpc. After recive a request, I should put this stream to a Chan, then a goroutine handle this request and send back. But I get a rpc error: code = Unavailable desc = transport is closing when I write back to client in the goroutine. So I wonder if I can pass stream to Channel, does this operation close the connection?

here is Recognize in protocol buffer

service AsrService {
     rpc Recognize(stream RecognizeRequest) returns (stream RecognizeResponse) {}
}

here is automatic generation using grpc

type AsrService_RecognizeServer interface {
    Send(*RecognizeResponse) error
    Recv() (*RecognizeRequest, error)
    grpc.ServerStream
}

here is put stream to Chan

func (s *ScheduleServer) Recognize(stream 
AsrService_RecognizeServer) error {
    req, err := stream.Recv() // I can use Recv here
    if err == io.EOF || err != nil {
        // do something
    }
    var asrRequest ASRRequest
    asrRequest.stream = &stream //pass stream to Chan
    ASRRequestChan <- &asrRequest

    return nil
}

Here is a goroutine to handle the Chan

type ASRRequest struct {
    stream AsrService_RecognizeServer
}

var ClientRequestChan = make(chan *ClientRequest, 200)
func HandlRequestChan() {
    for {
        select {
            case r := <- ClientRequestChan:
                Log.Infof("Chan get request info[%v]", r)
                var rsp RecognizeResponse
                rsp.Code = **
                streamInter := *r.stream
                err = streamInter.Send(&rsp) // I can use Send here
                if err != nil {
                    fmt.Printf("Grpc write failed,err[%v]", err)
                }
                fmt.Printf("return time[%v]\n",time.Now().UnixNano() / 1e6)
        }
    }    
}

then I get error rpc error: code = Unavailable desc = transport is closing, so is the stream closed after pass it to the Chan? Because if I do not use Chan, it can send result to client successfully.

1
Returning from the stream handler means that the server is done sending messages. That closes the connection with that client. After sending to ASRRequestChan, you have no other synchronization. The handler returns, and the next time you call streamInter.Send(&rsp) the connection is closed.blackgreen
With that said, the code you presented is declared as a bidirectional streaming, but in your stream handler you are receiving only once — and also sending once. So that's not a stream. It could be a simple RPC, or server-side streaming instead.blackgreen
@blackgreen I see, you mean Recognize function returned after put the stream in the Chan, so the connection is closed after Recognize returned. Right?Yongqi Z
yes. The usual practice to handle streams is to call stream.Recv in a loopblackgreen
@blackgreen I build a child goroutine to handle the request and use the sync.WaitGroup to make sure the Recognize will not return until child goroutine finish .Yongqi Z

1 Answers

1
votes

I change the strategy and use sync.WaitGroup to make sure main goroutine do not return until the stream send back. I will build a goroutine to handle this stream, and the main goroutine does not return until the child goroutine finish. So the connect will not close.

var wg sync.WaitGroup
func (s *ScheduleServer) Recognize(stream pb.AsrService_RecognizeServer) error {
    wg.Add(1)
    go s.Recognize_Syn(&wg, stream)

    wg.Wait()
    return nil
}

func (s *ScheduleServer) Recognize_Syn(wg *sync.WaitGroup, stream pb.AsrService_RecognizeServer) error {
    defer wg.Done()
    //do something
    err = stream.Send(&rsp)
    return nil
}