I am listening to UDP data packets on a heavy traffic network. The wireshark capture shows the UDP data packets ip address as 125.6.6.5 but when the packet is listened by my code "NetworkListener.java", I see it prints out the right information, but I do pass in this data and construct a new object "Packet" and then put into a LinkedBlockingQueue and read it in another class "Worker.java". When I print the same packet in Worker.java class I see that the ip address of the packet has being changed.
Since it being a heavy traffic network where I am getting more than 40 data packets in a second on that port and I am converting it to a diff object and storing in a linkedBlockingQueue, does the packets are colliding in the Queue and giving me an incorrect ip???
The java classes are:
public class NetworkListener implements Runnable
{
private LinkedBlockingQueue lbq;
private volatile boolean running;
public NetworkListener()
{
lbq = PacketQueue.getInstance();
running = true;
}
public void run()
{
try
{
byte[] rwhat = new byte[1024];
DatagramPacket packet = new DatagramPacket(rwhat, 1024);
// Setup listener on port 5000
InetSocketAddress isock = new InetSocketAddress(5000);
DatagramSocket datagramSocket = new DatagramSocket(null);
datagramSocket.setReuseAddress(true);
datagramSocket.bind(isock);
datagramSocket.setSoTimeout(15000); // 15 seconds timeouts
// loop reading packets
while (running)
{
try {
datagramSocket.receive(packet);
}
catch (SocketTimeoutException te) {
continue;
}
String ipAddr = new String (packet.getAddress().getHostAddress());
String payLoad = new String (packet.getData());
String strRecv = new String( "IP Address from Network Listener:[" + ipAddr + "], Payload :[" + payLoad + "]");
System.out.println(strRecv);
synchronized (lbq)
{
lbq.put(new Packet(packet.getData(), packet.getAddress().getHostAddress()));
lbq.notifyAll();
}
}
}
catch (Exception e)
{
}
}
/**
* Shutdown current thread
*/
public void shutdown()
{
running = false;
if (lbq != null)
{
synchronized (lbq)
{
lbq.notifyAll();
lbq = null;
}
}
}
}
public class Packet
{
private byte[] bytes;
private String ipAddress;
public Packet(final byte[] bytes, final String ipAddress)
{
this.bytes = bytes;
this.ipAddress = ipAddress;
}
public byte[] getBytes()
{
return bytes;
}
public String getipAddress()
{
return ipAddress;
}
}
public class PacketQueue extends LinkedBlockingQueue
{
private static volatile PacketQueue INSTANCE = null;
public static synchronized final PacketQueue getInstance()
{
if (INSTANCE == null)
INSTANCE = new PacketQueue();
return INSTANCE;
}
}
public class Worker implements Runnable
{
private LinkedBlockingQueue lbq;
private volatile boolean running;
public Worker()
{
lbq = PacketQueue.getInstance();
running = true;
}
public void run()
{
System.out.println("Thread Starting Up . . .");
while (running)
{
synchronized (lbq)
{
while (lbq.empty())
{
try
{
lbq.wait();
if (lbq == null)
return;
}
catch (InterruptedException e)
{
return;
}
}
}
Packet packet = (Packet)lbq.poll();
if (packet != null)
{
String ipAddr = new String (packet.getipAddress());
String payLoad = new String (packet.getBytes());
String strRecv = new String( "IP Address from Worker:[" + ipAddr + "], Payload :[" + payLoad + "]");
System.out.println(strRecv);
}
}
}
/**
* Shutdown current thread
*/
public void shutdown()
{
running = false;
if (lbq != null)
{
synchronized (lbq)
{
lbq.notifyAll();
lbq = null;
}
}
}
}
So if I am comparing both the print statements in NetworkListener.java and Worker.java, I would expect them to be the same ip address for a given payload but in my case sometimes its giving me correct ip address sometimes its giving an incorrect ip address. At every sec, I am seeing around 35 more data packets to be processed in the queue.
What am I doing wrong???
Is my code not able to handle heavy traffic?? Should I be using any other data structure apart from LinkedBlockingQueue?? Or should I not convert from DatagramPacket to Packet object??
Please advise and thanks for looking.