Sorry if this is a noob question, i'm new to grpc's server side streaming.
What I have right now in a function on server that streams to client
req, err := http.NewRequest("GET", actualURL, nil)
//skipping some lines// res, _ := http.DefaultClient.Do(req)
// closing body
defer res.Body.Close()
body, err := ioutil.ReadAll(res.Body)
// skipping some lines//
// unmarshaling the xml data received from the GET request done above
xmlDataErr := xml.Unmarshal(body, &ArrData)
// creating a variable of a custom type and creating object of specific struct Flight
var flightArrData ArrFlights
currArrData := flightArrData.Arr.Body.Flight
log.Println("Counting total rows to be send to the client", len(currArrData))
// looping over the response and sending streaming it to the client
for i := range currArrData {
farr := &pb.FlightArrivalsResponse{
ArrivalResponse: &pb.ArrivalFlightData{
Hapt: currArrData[i].HApt,
Fltnr: currArrData[i].Fltnr,
Sdt: currArrData[i].Sdt,
Acreg: currArrData[i].Acreg,
Park: currArrData[i].Park,
EstD: currArrData[i].EstD,
Gate: currArrData[i].Gate,
AblkD: currArrData[i].AblkD,
ActD: currArrData[i].ActD,
Callsign: currArrData[i].Callsign,
},
}
senderr := stream.Send(farr)
// skipping some lines //
}
// returning nil once done return nil }
The issue i am facing On the client side I have function that receives this response, sleeps for n number of minutes and ask for the response again.
Client do get the response as expected in first call, however for every subsequent call something strange happens and that is my current issue, I try to illustrate in the form of every subsequent calls below :
Call 1 from Client to Server --> Server returns 200 rows
client sleep for n minutes
Call 2 from client to Server --> Server returns 400 rows!! (basically every row twice as in 200 + 200)
client sleep for n minutes
Call 3 from client to Server --> Server returns 600 rows!! (200+200+200)
Summary
I do check for error == io.EOF on the client side and only way right now to stop this stacking up of response from server to client is to stop the server and restart.
I am not sure what am I missing here to make sure that I am only sending the actual and exact response that i have received from the GET request. Would greatly appreciate any hint on this.
More information
Part from gRPC protobuffer def in proto file
rpc GetFlightArrivals (FlightArrivalsRequestURL) returns (stream FlightArrivalsResponse) {
};
Complete code for the server side impl of above gRPC
func (s *Server) GetFlightArrivals(url *pb.FlightArrivalsRequestURL, stream pb.Flight_GetFlightArrivalsServer) error {
cData := ch.ConfigProcessor() // fetches the initial part of the URL from config file
if url.ArrivalURL == "" {
actualURL = cData.FURL + "/arr/all" // adding the arrival endpoint
} else {
actualURL = url.ArrivalURL
}
// build new request to get arrival data
req, err := http.NewRequest("GET", actualURL, nil)
if err != nil {
log.Fatalln("Recheck URL or connectivity, failed to make REST call")
}
req.Header.Add("Cache-Control", "no-cache")
req.Header.Add("Accept", "text/plain")
req.Header.Add("Connection", "keep-alive")
req.Header.Add("app_id", cData.AppID)
req.Header.Add("app_key", cData.AppKey)
req.Header.Add("Content-Type", "application/xml")
res, _ := http.DefaultClient.Do(req)
// closing body
defer res.Body.Close()
body, err := ioutil.ReadAll(res.Body)
if err != nil {
log.Fatalln("Failed to get any response")
return err
}
// unmarshaling the xml data
xmlDataErr := xml.Unmarshal(body, &flightArrData)
if xmlDataErr != nil {
log.Fatalln("Failed to unmarshal arrival xml data see error, ", xmlDataErr)
return xmlDataErr
}
currArrData := flightArrData.Arr.Body.Flight
log.Println("Counting total arrivals in Finland", len(currArrData))
log.Println("Starting FlightDeparturesResponse for client")
for i := range currArrData {
farr := &pb.FlightArrivalsResponse{
ArrivalResponse: &pb.ArrivalFlightData{
Hapt: currArrData[i].HApt,
Fltnr: currArrData[i].Fltnr,
Sdt: currArrData[i].Sdt,
Acreg: currArrData[i].Acreg,
Park: currArrData[i].Park,
EstD: currArrData[i].EstD,
Gate: currArrData[i].Gate,
AblkD: currArrData[i].AblkD,
ActD: currArrData[i].ActD,
Callsign: currArrData[i].Callsign,
},
}
senderr := stream.Send(farr)
if senderr != nil {
log.Fatalln("Failed to stream arrival response to the client, see error ", senderr)
return senderr
}
}
currArrData = nil
log.Println("Attempting to empty the arrival data")
return nil
}
Test case for checking above impl of gRPC
I just start a test grpc server and make call to that grpc in this test case. Implementation on the client is same around receiving the data.
func TestGetFlightArrivals(t *testing.T) {
const addr = "localhost:50051"
conn, err := grpc.Dial(addr, grpc.WithInsecure())
if err != nil {
t.Fatalf("Did not connect: #{err}")
}
defer conn.Close()
f := pb.NewFlightClient(conn)
t.Run("GetFlightArrivals", func(t *testing.T) {
var res, err = f.GetFlightArrivals(context.Background(), &pb.FlightArrivalsRequestURL{ArrivalURL: ""})
if err != nil {
t.Error("Failed to make the REST call", err)
}
for {
msg, err := res.Recv()
if err == io.EOF {
t.Log("Finished reading all the message")
break
}
if err != nil {
t.Error("Failed to receive response")
}
t.Log("Message from the server", msg.GetArrivalResponse())
}
})
}
ArrData
but that data is never used) which makes it difficult to comment. – Brits