2
votes

I have an application which simply pulls items from a queue and then attempts to send them asynchronously via a network socket.

I am experiencing some issues when things go wrong or the client aborts the host socket.

Here is some of my code: I think it may explain better than my words:

Here is my SocketState.cs class which holds the socket and related info:

public class SocketState
{
    public const int BufferSize = 256;
    public Socket WorkSocket { get; set; }
    public byte[] Buffer { get; set; }   

    /// <summary>
    /// Constructor receiving a socket
    /// </summary>
    /// <param name="socket"></param>
    public SocketState(Socket socket)
    {
        WorkSocket = socket;
        Buffer = new byte[BufferSize];
    }
}

Here is my SocketHandler.cs class which controls most of the socket operations:

public class SocketHandler : IObserver
{
    # region Class Variables
    private static readonly log4net.ILog Log = log4net.LogManager.GetLogger(System.Reflection.MethodBase.GetCurrentMethod().DeclaringType);
    private SocketState _state;
    private OutgoingConnectionManager _parentConnectionManager;
    private int _recieverId;
    private readonly ManualResetEvent _sendDone = new ManualResetEvent(false);

    public volatile bool NameIsSet = false;
    private ManualResetEvent _receiveDone = new ManualResetEvent(false);
    public String Name;
    public readonly Guid HandlerId;
    # endregion Class Variables

    /// <summary>
    /// Constructor
    /// </summary>
    public SocketHandler(SocketState state)
    {
        HandlerId = Guid.NewGuid();
        _state = state;       
        _state.WorkSocket.BeginReceive(_state.Buffer, 0, SocketState.BufferSize, 0, new AsyncCallback(ReceiveCallback), this._state);
    }

    /// <summary>
    /// Set the receiver id for this socket.
    /// </summary>
    public void SetReceiverId(int receiverId)
    {
        _recieverId = receiverId;
    }

    /// <summary>
    /// Stops / closes a connection
    /// </summary>
    public void Stop()
    {
        CloseConnection();
    }

    /// <summary>
    /// Used to set this connections parent connection handler
    /// </summary>
    /// <param name="conMan"></param>
    public void SetConnectionManager(OutgoingConnectionManager conMan)
    {
        _parentConnectionManager = conMan;
    }

    /// <summary>
    /// Returns if the socket is connected or not
    /// </summary>
    /// <returns></returns>
    public bool IsConnected()
    {
        return _state.WorkSocket.Connected;
    }

    /// <summary>
    /// Public Class that implements the observer interface , this function provides a portal to receive new messages which it must send
    /// </summary>
    /// <param name="e"> Event to execute</param>        
    public void OnMessageRecieveEvent(ObserverEvent e)
    {
        SendSignalAsync(e.Message.payload);           
    }
    # region main working space

    # region CallBack Functions
    /// <summary>
    /// CallBack Function that is called when a connection Receives Some Data
    /// </summary>
    /// <param name="ar"></param>
    private void ReceiveCallback(IAsyncResult ar)
    {
        try
        {
            String content = String.Empty;
            if (ar != null)
            {
                SocketState state = (SocketState)ar.AsyncState;
                if (state != null)
                {
                    Socket handler = state.WorkSocket;
                    if (handler != null)
                    {
                        int bytesRead = handler.EndReceive(ar);

                        if (bytesRead > 0)
                        {
                            StringBuilder Sb = new StringBuilder();
                            Sb.Append(Encoding.Default.GetString(state.Buffer, 0, bytesRead));

                            if (Sb.Length > 1)
                            {
                                content = Sb.ToString();

                                foreach (var s in content.Split('Ÿ'))
                                {
                                    if (string.Compare(s, 0, "ID", 0, 2) == 0)
                                    {
                                        Name = s.Substring(2);
                                        NameIsSet = true;
                                    }

                                    if (string.Compare(s, 0, "TG", 0, 2) == 0)
                                    {
                                        LinkReplyToTag(s.Substring(2), this.Name);
                                    }
                                }
                                _state.WorkSocket.BeginReceive(_state.Buffer, 0, SocketState.BufferSize, 0,
                                    new AsyncCallback(ReceiveCallback), _state);
                            }
                        }
                    }
                }
            }
        }
        catch
        {
            CloseConnection();
        }
    }

    /// <summary>
    /// Call Back Function called when data is send though this connection
    /// </summary>
    /// <param name="asyncResult"></param>
    private void SendCallback(IAsyncResult asyncResult)
    {
        try
        {
            if (asyncResult != null)
            {
                Socket handler = (Socket)asyncResult.AsyncState;
                if (handler != null)
                {
                    int bytesSent = handler.EndSend(asyncResult);
                    // Signal that all bytes have been sent.
                    _sendDone.Set();
                    if (bytesSent > 0)
                    {
                        return;
                    }
                }
            }
        }
        catch (Exception e)
        {                
            Log.Error("Transmit Failed On Send CallBack");
        }
        //Close socket as something went wrong
        CloseConnection();
    }
    # endregion CallBack Functions

    /// <summary>
    /// Sends a signal out of the current connection
    /// </summary>
    /// <param name="signal"></param>
    private void SendSignalAsync(Byte[] signal)
    {
        try
        {
            if (_state != null)
            {
                if (_state.WorkSocket != null)
                {
                    if (_state.WorkSocket.Connected)
                    {
                        try
                        {
                            _sendDone.Reset();
                            _state.WorkSocket.BeginSend(signal, 0, signal.Length, 0, new AsyncCallback(SendCallback),
                                _state.WorkSocket);
                            _sendDone.WaitOne(200);
                            return;
                        }
                        catch (Exception e)
                        {
                            Log.Error("Transmission Failier for IP: " + ((IPEndPoint)_state.WorkSocket.RemoteEndPoint).Address, e);
                        }
                    }
                }
            }
            //Close Connection as something went wrong
            CloseConnection();
        }
        catch (Exception e)
        {
            Log.Error("An Exception has occurred in the SendSignalAsync function", e);
        }
    }     


    /// <summary>
    /// Call this to Close the connection
    /// </summary>
    private void CloseConnection()
    {
        try
        {
            var ip = "NA";
            try
            {
                if (_state != null)
                {
                    ip = ((IPEndPoint)_state.WorkSocket.RemoteEndPoint).Address.ToString();
                }
            }
            catch
            {
                //Cannot get the ip address
            }
            OutgoingListeningServer.UpdateRecieversHistory(_recieverId, ip, "Disconnected");

            try
            {
                if (_state != null)
                {
                    if (_state.WorkSocket != null)
                    {
                        _state.WorkSocket.Shutdown(SocketShutdown.Both);
                        _state.WorkSocket.Close();
                        //_state.WorkSocket.Dispose();
                        _state.WorkSocket = null;
                        _state = null;
                    }
                }
            }
            catch (Exception e)
            {
                _state = null;
                Log.Error("Error while trying to close socket for IP: " + ip, e);
            }
            if (_parentConnectionManager != null)
            {
                // Remove this connection from the connection list
                _parentConnectionManager.ConnectionRemove(this);
            }
        }
        catch (Exception e)
        {
            Log.Error("A major error occurred in the close connection function, outer try catch was hit", e);
        }
    }  
    # endregion main working space
}

And here is my thread which will then call the OnMessageRecieveEvent() function in the SocketHandler.cs class.

  private void Main()
    {
        Log.Info("Receiver" + ReceiverDb.Id + " Thread Starting");
        // Exponential back off 
        var eb = new ExponentialBackoff();
        try
        {
            while (_run)
            {
                try
                {
                    if (ReceiverOutgoingConnectionManager.HasConnectedClient())
                    {
                        //Fetch Latest Item
                        ILogItem logItem;
                        if (_receiverQueue.TryDequeue(out logItem))
                        {
                            //Handle the logItem 
                           **calls the OnMessageRecieveEvent() for all my connections.                       
                                    ReceiverOutgoingConnectionManager.SendItemToAllConnections(logItem);  
                            //Reset the exponential back off counter
                            eb.reset();
                        }
                        else
                        {
                            //Exponential back off thread sleep
                            eb.sleep();
                        }
                    }
                    else
                    {
                        //Exponential back off thread sleep
                        eb.sleep();
                    }
                }
                catch (Exception e)
                {
                    Log.Error("Error occurred in " + ReceiverDb.Id + "receiver mains thread ", e);
                }
            }
        }
        catch (Exception e)
        {
            Log.Error(" ** An error has occurred in a receiver holder main thread ** =====================================================================>", e);
        }
        Log.Info("Receiver" + ReceiverDb.Id + " Thread Exiting ** ===============================================================================>");
    }

I apologize for so much code. But I fear that it might not be something obvious so I posted all related code.

Now to further explain my issue. If an error happens on the socket. I get allot of Transmit Failed On Send CallBack. which to me means that i'm not closing the socket correctly, and there are still outstanding callback being executed.

Is there no way I can cancel all outstanding callback when i close the socket?

I am also sure there will be a few suggestions / issues with my code I posted. I would more than appreciate the feedback.

1
Man, this callback is just outstanding! #just_had_toSimpleVar
What would canceling unperformed actions mean? Are you trying to notify the callers that their actions were canceled due to socket closing? As I see it, if you have a queue of actions, and then you stop going through that queue, the queued actions are unperformed, overlooked, ignored, and canceled.SimpleVar

1 Answers

1
votes

I'm going to assume this is for learning purposes, since writing your own networking code for other purposes is somewhat... hard.

Getting an exception in the send callback is fine. That's what exceptions are for. However, you need to identify the exception, rather than just catching the blanket Exception and pretty much ignoring all the information inside.

Handle the exceptions you can handle in the places where it's proper for them to be handled.

I'm not going to be very specific, because there's a lot of issues with your code. But one of the key things is ignoring proper socket handling - when you receive 0 bytes, it means you're supposed to close the socket. That's the signal from the other side saying "we're done". By ignoring this, you're working with a (perhaps partially) closed connection.

Please, use some networking library that can give you the guarantees (and simplicity) you need. WCF, Lindgren, whatever. TCP doesn't guarantee you'll get the message in one part, for example, so your message parsing code is unreliable. You need to use some message framing, you need to add proper error handling... Socket isn't a high-level construct, it doesn't work "automagically", you need to implement all this stuff on your own.

Even ignoring the network code itself, it's obvious you're just ignoring most of the complexities of asynchronous code. What's up with the SendDone? Either you want to use asynchronous code, and then get rid of the synchronicity, or you want synchronous code, and then why are you using asynchronous sockets?