I have an application which simply pulls items from a queue and then attempts to send them asynchronously via a network socket.
I am experiencing some issues when things go wrong or the client aborts the host socket.
Here is some of my code: I think it may explain better than my words:
Here is my SocketState.cs
class which holds the socket and related info:
public class SocketState
{
public const int BufferSize = 256;
public Socket WorkSocket { get; set; }
public byte[] Buffer { get; set; }
/// <summary>
/// Constructor receiving a socket
/// </summary>
/// <param name="socket"></param>
public SocketState(Socket socket)
{
WorkSocket = socket;
Buffer = new byte[BufferSize];
}
}
Here is my SocketHandler.cs
class which controls most of the socket operations:
public class SocketHandler : IObserver
{
# region Class Variables
private static readonly log4net.ILog Log = log4net.LogManager.GetLogger(System.Reflection.MethodBase.GetCurrentMethod().DeclaringType);
private SocketState _state;
private OutgoingConnectionManager _parentConnectionManager;
private int _recieverId;
private readonly ManualResetEvent _sendDone = new ManualResetEvent(false);
public volatile bool NameIsSet = false;
private ManualResetEvent _receiveDone = new ManualResetEvent(false);
public String Name;
public readonly Guid HandlerId;
# endregion Class Variables
/// <summary>
/// Constructor
/// </summary>
public SocketHandler(SocketState state)
{
HandlerId = Guid.NewGuid();
_state = state;
_state.WorkSocket.BeginReceive(_state.Buffer, 0, SocketState.BufferSize, 0, new AsyncCallback(ReceiveCallback), this._state);
}
/// <summary>
/// Set the receiver id for this socket.
/// </summary>
public void SetReceiverId(int receiverId)
{
_recieverId = receiverId;
}
/// <summary>
/// Stops / closes a connection
/// </summary>
public void Stop()
{
CloseConnection();
}
/// <summary>
/// Used to set this connections parent connection handler
/// </summary>
/// <param name="conMan"></param>
public void SetConnectionManager(OutgoingConnectionManager conMan)
{
_parentConnectionManager = conMan;
}
/// <summary>
/// Returns if the socket is connected or not
/// </summary>
/// <returns></returns>
public bool IsConnected()
{
return _state.WorkSocket.Connected;
}
/// <summary>
/// Public Class that implements the observer interface , this function provides a portal to receive new messages which it must send
/// </summary>
/// <param name="e"> Event to execute</param>
public void OnMessageRecieveEvent(ObserverEvent e)
{
SendSignalAsync(e.Message.payload);
}
# region main working space
# region CallBack Functions
/// <summary>
/// CallBack Function that is called when a connection Receives Some Data
/// </summary>
/// <param name="ar"></param>
private void ReceiveCallback(IAsyncResult ar)
{
try
{
String content = String.Empty;
if (ar != null)
{
SocketState state = (SocketState)ar.AsyncState;
if (state != null)
{
Socket handler = state.WorkSocket;
if (handler != null)
{
int bytesRead = handler.EndReceive(ar);
if (bytesRead > 0)
{
StringBuilder Sb = new StringBuilder();
Sb.Append(Encoding.Default.GetString(state.Buffer, 0, bytesRead));
if (Sb.Length > 1)
{
content = Sb.ToString();
foreach (var s in content.Split('Ÿ'))
{
if (string.Compare(s, 0, "ID", 0, 2) == 0)
{
Name = s.Substring(2);
NameIsSet = true;
}
if (string.Compare(s, 0, "TG", 0, 2) == 0)
{
LinkReplyToTag(s.Substring(2), this.Name);
}
}
_state.WorkSocket.BeginReceive(_state.Buffer, 0, SocketState.BufferSize, 0,
new AsyncCallback(ReceiveCallback), _state);
}
}
}
}
}
}
catch
{
CloseConnection();
}
}
/// <summary>
/// Call Back Function called when data is send though this connection
/// </summary>
/// <param name="asyncResult"></param>
private void SendCallback(IAsyncResult asyncResult)
{
try
{
if (asyncResult != null)
{
Socket handler = (Socket)asyncResult.AsyncState;
if (handler != null)
{
int bytesSent = handler.EndSend(asyncResult);
// Signal that all bytes have been sent.
_sendDone.Set();
if (bytesSent > 0)
{
return;
}
}
}
}
catch (Exception e)
{
Log.Error("Transmit Failed On Send CallBack");
}
//Close socket as something went wrong
CloseConnection();
}
# endregion CallBack Functions
/// <summary>
/// Sends a signal out of the current connection
/// </summary>
/// <param name="signal"></param>
private void SendSignalAsync(Byte[] signal)
{
try
{
if (_state != null)
{
if (_state.WorkSocket != null)
{
if (_state.WorkSocket.Connected)
{
try
{
_sendDone.Reset();
_state.WorkSocket.BeginSend(signal, 0, signal.Length, 0, new AsyncCallback(SendCallback),
_state.WorkSocket);
_sendDone.WaitOne(200);
return;
}
catch (Exception e)
{
Log.Error("Transmission Failier for IP: " + ((IPEndPoint)_state.WorkSocket.RemoteEndPoint).Address, e);
}
}
}
}
//Close Connection as something went wrong
CloseConnection();
}
catch (Exception e)
{
Log.Error("An Exception has occurred in the SendSignalAsync function", e);
}
}
/// <summary>
/// Call this to Close the connection
/// </summary>
private void CloseConnection()
{
try
{
var ip = "NA";
try
{
if (_state != null)
{
ip = ((IPEndPoint)_state.WorkSocket.RemoteEndPoint).Address.ToString();
}
}
catch
{
//Cannot get the ip address
}
OutgoingListeningServer.UpdateRecieversHistory(_recieverId, ip, "Disconnected");
try
{
if (_state != null)
{
if (_state.WorkSocket != null)
{
_state.WorkSocket.Shutdown(SocketShutdown.Both);
_state.WorkSocket.Close();
//_state.WorkSocket.Dispose();
_state.WorkSocket = null;
_state = null;
}
}
}
catch (Exception e)
{
_state = null;
Log.Error("Error while trying to close socket for IP: " + ip, e);
}
if (_parentConnectionManager != null)
{
// Remove this connection from the connection list
_parentConnectionManager.ConnectionRemove(this);
}
}
catch (Exception e)
{
Log.Error("A major error occurred in the close connection function, outer try catch was hit", e);
}
}
# endregion main working space
}
And here is my thread which will then call the OnMessageRecieveEvent()
function in the SocketHandler.cs
class.
private void Main()
{
Log.Info("Receiver" + ReceiverDb.Id + " Thread Starting");
// Exponential back off
var eb = new ExponentialBackoff();
try
{
while (_run)
{
try
{
if (ReceiverOutgoingConnectionManager.HasConnectedClient())
{
//Fetch Latest Item
ILogItem logItem;
if (_receiverQueue.TryDequeue(out logItem))
{
//Handle the logItem
**calls the OnMessageRecieveEvent() for all my connections.
ReceiverOutgoingConnectionManager.SendItemToAllConnections(logItem);
//Reset the exponential back off counter
eb.reset();
}
else
{
//Exponential back off thread sleep
eb.sleep();
}
}
else
{
//Exponential back off thread sleep
eb.sleep();
}
}
catch (Exception e)
{
Log.Error("Error occurred in " + ReceiverDb.Id + "receiver mains thread ", e);
}
}
}
catch (Exception e)
{
Log.Error(" ** An error has occurred in a receiver holder main thread ** =====================================================================>", e);
}
Log.Info("Receiver" + ReceiverDb.Id + " Thread Exiting ** ===============================================================================>");
}
I apologize for so much code. But I fear that it might not be something obvious so I posted all related code.
Now to further explain my issue.
If an error happens on the socket. I get allot of Transmit Failed On Send CallBack
. which to me means that i'm not closing the socket correctly, and there are still outstanding callback being executed.
Is there no way I can cancel all outstanding callback when i close the socket?
I am also sure there will be a few suggestions / issues with my code I posted. I would more than appreciate the feedback.