I've created a gRPC server-side streaming service (in Rust) that streams messages from a kafka topic on cluster A
. The Go client on cluster B
connects to the rpc service everytime an external client initiates a websocket connection; the messages are streamed to the external client over this connection.
However, the Go client does not seem to reliably consume from the rust server, (even though the messages have definitely been committed). Even-though the websocket is definitely reading information in from the external client; it appears not to consume from the grpc stream sometimes. I've tried to read the documentation and identify any resource leaks that might cause this but I haven't been able to solve the problem.
I think the problem might be in the context handling, or around closing the connection (as the stream initially succeeds, then the messages fail to send if you close the websocket and reopen it)
Go client code
type KafkaChannel struct {
Client bridge.KafkaStreamClient
}
func (kc *KafkaChannel) Consume(topic string) (<-chan []byte, func() error) {
readChan := make(chan []byte)
// Context with cancellation to close the routine
ctx, cancel := context.WithCancel(context.Background())
stream, err := kc.Client.Consume(ctx, &bridge.ConsumeRequest{
Topic: topic,
})
if err != nil {
log.Fatalf("Error creating cosumer stream: %v", err)
}
// Launch listener in new thread
go func(reader *chan []byte, consumeStream *bridge.KafkaStream_ConsumeClient) {
// Recover from a panic (resources consumed)
defer func() {
if recover() != nil {
log.Println("Consumer routine closed")
}
}()
for {
response, err := stream.Recv()
if err != nil {
log.Printf("Error creating cosumer stream: %v", err)
break
}
switch data := response.OptionalContent.(type) {
case *bridge.KafkaResponse_Content:
*reader <- *&data.Content
default:
break
}
}
}(&readChan, &stream)
// Create a callback func that frees the resources
closeCallback := func() error {
err := stream.CloseSend()
close(readChan)
cancel()
return err
}
return readChan, closeCallback
}
Websocket
type ChatHandler struct {
Database *db.Queries
Client *grpc.ClientConn
Context context.Context
SessionChannel chan []byte
}
func (handler *ChatHandler) GetChatConnection(c *websocket.Conn) {
//initialisation...
consumer, closeConsume := kChannel.Consume(topic)
for msg := range consumer {
log.Printf("Received message from bridge: %s", string(msg))
writeMessageStart := time.Now()
if err = c.WriteMessage(1, msg); err != nil {
log.Printf("Error writing message: %v", err)
writeMessageElapsed := time.Since(writeMessageStart)
log.Printf("Write time elapsed error: %s", writeMessageElapsed)
if errors.Is(err, syscall.EPIPE) {
log.Printf("Sys error: %v", err)
//continue
}
closeConsume()
handler.close(c)
return
}
writeMessageElapsed := time.Since(writeMessageStart)
log.Printf("Write time elapsed no error: %s", writeMessageElapsed)
}
}
Rust server-side code
For completeness
async fn consume(
&self,
request: Request<ConsumeRequest>,
) -> Result<Response<Self::ConsumeStream>, Status> {
let (tx, rx) = mpsc::unbounded_channel();
info!("Initiated read-only stream");
tokio::spawn(async move {
let message = match Some(request.get_ref()) {
Some(x) => x,
None => return,
};
let topic = message.topic.clone();
info!("Consuming on topic: {}", topic);
let consumer = create_kafka_consumer(topic);
loop {
let result = consumer.stream().next().await;
match result {
None => {
warn!("Received none-type from consumer stream");
continue;
}
Some(Err(e)) => {
error!("Error consuming from kafka broker: {:?}", e);
continue;
}
Some(Ok(message)) => {
let payload = match message.payload_view::<str>() {
None => {
warn!("Recived none-type when unpacking payload");
continue;
}
Some(Ok(s)) => {
info!("Received payload: {:?}", s);
s
}
Some(Err(e)) => {
error!("Error viewing payload contents: {}", e);
return;
}
};
info!("Received message from broker in read-only stream");
if payload.len() > 0 {
info!("Sending payload {:?}", payload);
match tx.send(Ok(KafkaResponse {
success: true,
optional_content: Some(
bridge::kafka_response::OptionalContent::Content(
(*payload).as_bytes().to_vec(),
),
),
})) {
Ok(_) => info!("Successfully sent payload to client"),
Err(e) => {
error!("GRPC error sending message to client {:?}", e);
return;
}
}
} else {
warn!("No content detected in payload from broker");
}
match consumer.commit_message(&message, CommitMode::Async) {
Ok(_) => (),
Err(e) => {
error!("Error commiting a consumed message: {:?}", e);
return;
}
}
}
}
}
});
Ok(Response::new(Box::pin(
tokio_stream::wrappers::UnboundedReceiverStream::new(rx),
)))
}