1
votes

I've created a gRPC server-side streaming service (in Rust) that streams messages from a kafka topic on cluster A. The Go client on cluster B connects to the rpc service everytime an external client initiates a websocket connection; the messages are streamed to the external client over this connection.

However, the Go client does not seem to reliably consume from the rust server, (even though the messages have definitely been committed). Even-though the websocket is definitely reading information in from the external client; it appears not to consume from the grpc stream sometimes. I've tried to read the documentation and identify any resource leaks that might cause this but I haven't been able to solve the problem.

I think the problem might be in the context handling, or around closing the connection (as the stream initially succeeds, then the messages fail to send if you close the websocket and reopen it)

Go client code

type KafkaChannel struct {
    Client bridge.KafkaStreamClient
}

func (kc *KafkaChannel) Consume(topic string) (<-chan []byte, func() error) {
    readChan := make(chan []byte)

    // Context with cancellation to close the routine
    ctx, cancel := context.WithCancel(context.Background())
    stream, err := kc.Client.Consume(ctx, &bridge.ConsumeRequest{
        Topic: topic,
    })
    if err != nil {
        log.Fatalf("Error creating cosumer stream: %v", err)
    }

    // Launch listener in new thread
    go func(reader *chan []byte, consumeStream *bridge.KafkaStream_ConsumeClient) {
        // Recover from a panic (resources consumed)
        defer func() {
            if recover() != nil {
                log.Println("Consumer routine closed")
            }
        }()
        for {
            response, err := stream.Recv()
            if err != nil {
                log.Printf("Error creating cosumer stream: %v", err)
                break
            }
            switch data := response.OptionalContent.(type) {
            case *bridge.KafkaResponse_Content:
                *reader <- *&data.Content
            default:
                break

            }
        }

    }(&readChan, &stream)
    // Create a callback func that frees the resources
    closeCallback := func() error {
        err := stream.CloseSend()
        close(readChan)
        cancel()
        return err
    }
    return readChan, closeCallback
}

Websocket

type ChatHandler struct {
    Database       *db.Queries
    Client         *grpc.ClientConn
    Context        context.Context
    SessionChannel chan []byte
}
func (handler *ChatHandler) GetChatConnection(c *websocket.Conn) {
    //initialisation...
    consumer, closeConsume := kChannel.Consume(topic)

    for msg := range consumer {
        log.Printf("Received message from bridge: %s", string(msg))
        writeMessageStart := time.Now()

        if err = c.WriteMessage(1, msg); err != nil {
            log.Printf("Error writing message: %v", err)
            writeMessageElapsed := time.Since(writeMessageStart)
            log.Printf("Write time elapsed error: %s", writeMessageElapsed)
            if errors.Is(err, syscall.EPIPE) {
                log.Printf("Sys error: %v", err)
                //continue
            }
            closeConsume()
            handler.close(c)
            return
        }
        writeMessageElapsed := time.Since(writeMessageStart)
        log.Printf("Write time elapsed no error: %s", writeMessageElapsed)
    }
}

Rust server-side code

For completeness

async fn consume(
        &self,
        request: Request<ConsumeRequest>,
    ) -> Result<Response<Self::ConsumeStream>, Status> {
        let (tx, rx) = mpsc::unbounded_channel();
        info!("Initiated read-only stream");
        tokio::spawn(async move {
            let message = match Some(request.get_ref()) {
                Some(x) => x,
                None => return,
            };
            let topic = message.topic.clone();
            info!("Consuming on topic: {}", topic);
            let consumer = create_kafka_consumer(topic);
            loop {
                let result = consumer.stream().next().await;
                match result {
                    None => {
                        warn!("Received none-type from consumer stream");
                        continue;
                    }
                    Some(Err(e)) => {
                        error!("Error consuming from kafka broker: {:?}", e);
                        continue;
                    }
                    Some(Ok(message)) => {
                        let payload = match message.payload_view::<str>() {
                            None => {
                                warn!("Recived none-type when unpacking payload");
                                continue;
                            }
                            Some(Ok(s)) => {
                                info!("Received payload: {:?}", s);
                                s
                            }
                            Some(Err(e)) => {
                                error!("Error viewing payload contents: {}", e);
                                return;
                            }
                        };
                        info!("Received message from broker in read-only stream");
                        if payload.len() > 0 {
                            info!("Sending payload {:?}", payload);
                            match tx.send(Ok(KafkaResponse {
                                success: true,
                                optional_content: Some(
                                    bridge::kafka_response::OptionalContent::Content(
                                        (*payload).as_bytes().to_vec(),
                                    ),
                                ),
                            })) {
                                Ok(_) => info!("Successfully sent payload to client"),
                                Err(e) => {
                                    error!("GRPC error sending message to client {:?}", e);
                                    return;
                                }
                            }
                        } else {
                            warn!("No content detected in payload from broker");
                        }
                        match consumer.commit_message(&message, CommitMode::Async) {
                            Ok(_) => (),
                            Err(e) => {
                                error!("Error commiting a consumed message: {:?}", e);
                                return;
                            }
                        }
                    }
                }
            }
        });
        Ok(Response::new(Box::pin(
            tokio_stream::wrappers::UnboundedReceiverStream::new(rx),
        )))
    }
1

1 Answers

0
votes

The problem was a resource leak in the Go client and to some extent, the Rust server too.

Gracefully closing the Go client stream:

For a streaming rpc called Consume

ctx, cancel := context.WithCancel(context.Background())
stream, err := handler.Client.Consume(ctx, &bridge.ConsumeRequest{
    Topic: topic,
})
closeCallback := func() {
    stream.CloseSend()
    cancel()
    c.Close() // where c := *websocket.Conn
}
defer closeCallback()

It's important to initialise the stream with a cancelable context so that the runtime can free the resources associated with the multiplexed rpc. Calling cancel() shuts down the connection on the client-side (this will show up as an error on the server when it tries to send a new message).

Rust detect client.CloseSend() on server-side.

Calling stream.CloseSend() in the Go client does not notify the server to terminate the connection, it merely signals that the client will stop sending messages. In this particular case, I am doing work in async threads within the streaming rpc connections, so in order to gracefully shut down the connection when a client disconnects from the websocket, it was important to detect a CloseSend packet and shut down the UnboundedReceiver channel in order to shut down the connection.

Note that the motivation for doing this was that the server will continue to try to send packets to a 'dead' receiver because it will think the connection is still alive otherwise.

The following code was adapted from this thread:

pub struct DropReceiver<T> {
    rx: mpsc::UnboundedReceiver<T>,
}

impl<T> Stream for DropReceiver<T> {
    type Item = T;

    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
        Pin::new(&mut self.rx).poll_recv(cx)
    }
}

// The Drop trait detects the `CloseSend`
impl<T> Drop for DropReceiver<T> {
    fn drop(&mut self) {
        info!("Connection has been droped");
        self.rx.close(); // Shutdown the receiver
    }
}

// Within the rpc

let (tx, rx) = mpsc::unbounded_channel();
let rx = DropReceiver { rx: rx };
tokio::spawn(async move {/** work */});
Ok(Response::new(Box::pin(rx))) // return this