3
votes

I'm setting websocket on .net core using middlewares (it looks something like that https://radu-matei.com/blog/aspnet-core-websockets-middleware/), but I have one issue for instance my client communicates with websocket via http protocol, in other words, websocket is wrapped in http, when my client sends http request, it goes to the service method where it finally sends message using websocket. Problem: is it possible to get that websocket reply of messages before returning response?

public async Task SendMessage(WebSocket socket, string response)
{
    if (socket.State != WebSocketState.Open)
        return;

    await socket.SendAsync(buffer: new ArraySegment<byte>(array: Encoding.UTF8.GetBytes(response),
            offset: 0,
            count: response.Length),
        messageType: WebSocketMessageType.Text,
        endOfMessage: true,
        cancellationToken: CancellationToken.None);

    await socket.ReceiveAsync() // When I finally receive some reply message, stop listening and return http response to my client
}
1
You basically want to read the header of the initial http request that then switches protocols to ws?Bercovici Adrian
What do I want I guess ain't that simple, I have a middleware who is listening for messages, but I would like to listen not only in middleware, but in services method as well, when http requests comes, he goes to controller -> service so here I would like to start listen for messages of websockets who have been initialized in middleware before. So when the required message comes, I am finishing service method and returning response.Andrius
I have added an implementation ; does that suit you ? If not ,could you give further explanation in what you plan to achieve?Bercovici Adrian

1 Answers

1
votes

From what you said i understood that you need to share the websocket between multiple services, each using it how it sees fit.Based on this scenario the proposed implementation has a Middleware that contains the different services that require the socket . Special care must be taken when deciding which service does the writing and which does the reading.The websocket is thread-safe in the context of reading-writing at the same time but not in the other scenarios.(writing-writing,reading-reading)

Startup

public void ConfigureServices(IServiceCollection colllection){
     collection.AddSingleton<Sender>();
     collection.AddSingleton<Receiver>();
 }
 public void Configure(IApplicationBuilder app)
 {
    app.UseWebsockets();
    app.UseMiddleware<DispatcherMiddleware>(); //receives socket here

 }

Middleware

public class DispatcherMiddleware{
    private Sender sender;
    private Receiver receiver;
    private RequestDelegate next;
   public Dispatcher(RequestDelegate req,Sender _sender,Receiver _receiver)
   {
    this.sender=_sender;
    this.receiver=_receiver;
    this.next=req;
   }
    public async Task Invoke(HttpContext context){
        if(!context.WebSockets.IsWebSocketRequest){
            return;
        }
        await DispatchAsync(context.WebSockets);
    }
    public async Task DispatchAsync(WebsocketManager manager){
         WebSocket socket=await manager.AcceptWebSocketAsync();
         Task t1=Task.Run(async()=>await this.sender.SendAsync(socket));
         Task t2=Task.Run(async()=>await this.receiver.ReceiveAsync(socket));
         await Task.WhenAll(t1,t2);

    }
}

Websocket services (example)

public class Sender(){
    public async Task SendAsync(WebSocket ws){
        try{
            while(true){
               // ws.SendAsync()
            }
        }
        catch(Exception ex){

        }

    }
}
public class Receiver{
    public async Task ReceiveAsync(WebSocket ws){
        try
        {
            while(true){
                //ws.ReceiveAsync(buffer)
            }
        }
        catch (System.Exception)
        {

            throw;
        }

    }
}

Edit
You can not perform concurrent reads /writes on the same socket.With this said if you want to use the same socket what you can do is make it thread-safe.Since the operation is async i suggest using the SemaphoreSlim class.

Below is an implementation of a shared socket:

public class SafeSocket {
        private const int BUFFER_SIZE = 1024;
        private WebSocket socket { get; set; }
        private SemaphoreSlim @lock = new SemaphoreSlim(1);
        public SafeSocket(WebSocket socket) {
            this.socket = socket;
        }
        public async Task<byte[]> ReadAsync() {
            byte[] buffer = ArrayPool<byte>.Shared.Rent(BUFFER_SIZE);
            await @lock.WaitAsync();
            try {
                await this.socket.SendAsync(buffer, WebSocketMessageType.Text, true, CancellationToken.None);
                return buffer;
            } catch (Exception) {

                throw;
            } finally {
                @lock.Release();
            }
        }

    }
    public class DispatcherMiddleware {
        private  List<Sender> senders;
        private Receiver receiver;
        private RequestDelegate next;
        public DispatcherMiddleware(RequestDelegate req, List<Sender> _senders, Receiver _receiver) {
            this.senders = _senders;
            this.receiver = _receiver;
            this.next = req;
        }
        public async Task Invoke(HttpContext context) {
            if (!context.WebSockets.IsWebSocketRequest) {
                return;
            }
            await DispatchAsync(context.WebSockets);
        }
        public async Task DispatchAsync(WebSocketManager manager) {

            WebSocket socket = await manager.AcceptWebSocketAsync();
            SafeSocket commonSocket = new SafeSocket(socket);
            Task[] senderTasks = new Task[this.senders.Count];
            for (int senderIndex = 0; senderIndex < senderTasks.Length; senderIndex++) {
                int index = senderIndex;// careful at index ! , make copy and use it inside closure !
                senderTasks[senderIndex] = Task.Run(async () => {
                    await commonSocket.ReadAsync();
                });
            }
        }

Keep in mind that the messages order will not be preserved.The same can be applied to the receivers.
So what you will end up with is N senders and K receivers , that at time T :

  1. 1 sender will write
  2. 1 receiver will read
  3. N-1 senders will wait for the lock
  4. K-1 receivers will wait for the lock

So in the end there will be just 2 operations at any given time. I do not know if that is what you need though.