0
votes

Sorry if this is a noob question, i'm new to grpc's server side streaming.

What I have right now in a function on server that streams to client

req, err := http.NewRequest("GET", actualURL, nil)

//skipping some lines// res, _ := http.DefaultClient.Do(req)

// closing body
defer res.Body.Close()

body, err := ioutil.ReadAll(res.Body)

// skipping some lines//

// unmarshaling the xml data received from the GET request done above
xmlDataErr := xml.Unmarshal(body, &ArrData)

// creating a variable of a custom type and creating object of specific struct Flight
var flightArrData ArrFlights
currArrData := flightArrData.Arr.Body.Flight

log.Println("Counting total rows to be send to the client", len(currArrData))

// looping over the response and sending streaming it to the client

for i := range currArrData {

    farr := &pb.FlightArrivalsResponse{
        ArrivalResponse: &pb.ArrivalFlightData{
            Hapt:     currArrData[i].HApt,
            Fltnr:    currArrData[i].Fltnr,
            Sdt:      currArrData[i].Sdt,
            Acreg:    currArrData[i].Acreg,
            Park:     currArrData[i].Park,
            EstD:     currArrData[i].EstD,
            Gate:     currArrData[i].Gate,
            AblkD:    currArrData[i].AblkD,
            ActD:     currArrData[i].ActD,
            Callsign: currArrData[i].Callsign,
        },
    }

    senderr := stream.Send(farr)

    // skipping some lines //
}

// returning nil once done return nil }

The issue i am facing On the client side I have function that receives this response, sleeps for n number of minutes and ask for the response again.

Client do get the response as expected in first call, however for every subsequent call something strange happens and that is my current issue, I try to illustrate in the form of every subsequent calls below :

Call 1 from Client to Server --> Server returns 200 rows

client sleep for n minutes

Call 2 from client to Server --> Server returns 400 rows!! (basically every row twice as in 200 + 200)

client sleep for n minutes

Call 3 from client to Server --> Server returns 600 rows!! (200+200+200)

Summary

I do check for error == io.EOF on the client side and only way right now to stop this stacking up of response from server to client is to stop the server and restart.

I am not sure what am I missing here to make sure that I am only sending the actual and exact response that i have received from the GET request. Would greatly appreciate any hint on this.

More information

Part from gRPC protobuffer def in proto file

rpc GetFlightArrivals (FlightArrivalsRequestURL) returns (stream FlightArrivalsResponse) {
    };

Complete code for the server side impl of above gRPC

func (s *Server) GetFlightArrivals(url *pb.FlightArrivalsRequestURL, stream pb.Flight_GetFlightArrivalsServer) error {

    cData := ch.ConfigProcessor() // fetches the initial part of the URL from config file

    if url.ArrivalURL == "" {
        actualURL = cData.FURL + "/arr/all" // adding the arrival endpoint

    } else {
        actualURL = url.ArrivalURL
    }

    // build new request to get arrival data
    req, err := http.NewRequest("GET", actualURL, nil)
    if err != nil {
        log.Fatalln("Recheck URL or connectivity, failed to make REST call")
    }
    req.Header.Add("Cache-Control", "no-cache")
    req.Header.Add("Accept", "text/plain")
    req.Header.Add("Connection", "keep-alive")
    req.Header.Add("app_id", cData.AppID)
    req.Header.Add("app_key", cData.AppKey)
    req.Header.Add("Content-Type", "application/xml")


    res, _ := http.DefaultClient.Do(req)

    // closing body
    defer res.Body.Close()

    body, err := ioutil.ReadAll(res.Body)
    if err != nil {
        log.Fatalln("Failed to get any response")
        return err
    }
    // unmarshaling the xml data
    xmlDataErr := xml.Unmarshal(body, &flightArrData)
    if xmlDataErr != nil {
        log.Fatalln("Failed to unmarshal arrival xml data see error, ", xmlDataErr)
        return xmlDataErr
    }

    currArrData := flightArrData.Arr.Body.Flight
    log.Println("Counting total arrivals in Finland", len(currArrData))
    log.Println("Starting FlightDeparturesResponse for client")
    for i := range currArrData {

        farr := &pb.FlightArrivalsResponse{
            ArrivalResponse: &pb.ArrivalFlightData{
                Hapt:     currArrData[i].HApt,
                Fltnr:    currArrData[i].Fltnr,
                Sdt:      currArrData[i].Sdt,
                Acreg:    currArrData[i].Acreg,
                Park:     currArrData[i].Park,
                EstD:     currArrData[i].EstD,
                Gate:     currArrData[i].Gate,
                AblkD:    currArrData[i].AblkD,
                ActD:     currArrData[i].ActD,
                Callsign: currArrData[i].Callsign,
            },
        }

        senderr := stream.Send(farr)

        if senderr != nil {
            log.Fatalln("Failed to stream arrival response to the client, see error ", senderr)
            return senderr
        }

    }
    currArrData = nil
    log.Println("Attempting to empty the arrival data")
    return nil
}

Test case for checking above impl of gRPC

I just start a test grpc server and make call to that grpc in this test case. Implementation on the client is same around receiving the data.

func TestGetFlightArrivals(t *testing.T) {
    const addr = "localhost:50051"
    conn, err := grpc.Dial(addr, grpc.WithInsecure())
    if err != nil {
        t.Fatalf("Did not connect: #{err}")
    }
    defer conn.Close()
    f := pb.NewFlightClient(conn)

    t.Run("GetFlightArrivals", func(t *testing.T) {

        var res, err = f.GetFlightArrivals(context.Background(), &pb.FlightArrivalsRequestURL{ArrivalURL: ""})
        if err != nil {
            t.Error("Failed to make the REST call", err)
        }

        for {
            msg, err := res.Recv()
            if err == io.EOF {
                t.Log("Finished reading all the message")
                break
            }
            if err != nil {
                t.Error("Failed to receive response")
            }
            t.Log("Message from the server", msg.GetArrivalResponse())

        }
    })
}

1
Please post a minimal reproducible example. Currently your example code has large holes in it (e.g. you unmarshall into ArrData but that data is never used) which makes it difficult to comment.Brits
@brits I will get back with that soon.sup
Your explanation is not enough, but I bet you are using a pointer and not destroying it. You may be using the same pointer again and again, duplicating your rows.rubens21
I have edited my question with full code and a test case calling that impl, is this what you were looking in terms of the additional info?sup
@rubens21 i am not sure what you exactly mean with duplicating, I was expecting that send on server and receive on the client should empty the stack on server side. I have added more info now to my question.sup

1 Answers

0
votes

Based on the comment from @rubens21 I added following lines var emptyArrData ArrFlights which is basically the complete XML struct and assign this emptyArrData to the struct flightArrData = emptyArrData