0
votes

I am listening to UDP data packets on a heavy traffic network. The wireshark capture shows the UDP data packets ip address as 125.6.6.5 but when the packet is listened by my code "NetworkListener.java", I see it prints out the right information, but I do pass in this data and construct a new object "Packet" and then put into a LinkedBlockingQueue and read it in another class "Worker.java". When I print the same packet in Worker.java class I see that the ip address of the packet has being changed.

Since it being a heavy traffic network where I am getting more than 40 data packets in a second on that port and I am converting it to a diff object and storing in a linkedBlockingQueue, does the packets are colliding in the Queue and giving me an incorrect ip???

The java classes are:

public class NetworkListener implements Runnable
{
private LinkedBlockingQueue lbq;

private volatile boolean running;


public NetworkListener()
{
    lbq = PacketQueue.getInstance();

    running = true;
}


public void run() 
{
    try 
    {
        byte[] rwhat = new byte[1024];

        DatagramPacket packet = new DatagramPacket(rwhat, 1024);

        // Setup listener on port 5000 
        InetSocketAddress isock = new InetSocketAddress(5000);

        DatagramSocket datagramSocket = new DatagramSocket(null);
        datagramSocket.setReuseAddress(true);
        datagramSocket.bind(isock);             
        datagramSocket.setSoTimeout(15000); // 15 seconds timeouts 

        // loop reading packets 
        while (running) 
        {
            try {
                datagramSocket.receive(packet);
            } 
            catch (SocketTimeoutException te) {
                continue;
            }

            String ipAddr = new String (packet.getAddress().getHostAddress());
            String payLoad = new String (packet.getData());
            String strRecv = new String( "IP Address from Network Listener:[" + ipAddr + "], Payload :[" + payLoad + "]");
            System.out.println(strRecv);

            synchronized (lbq) 
            {
                lbq.put(new Packet(packet.getData(), packet.getAddress().getHostAddress()));
                lbq.notifyAll();
            }
        }
    } 
    catch (Exception e) 
    {
    }
}

/**
 * Shutdown current thread
 */
public void shutdown()
{
    running = false;

    if (lbq != null)
    {
        synchronized (lbq) 
        {
            lbq.notifyAll();
            lbq = null;
        }
    }
}
}



public class Packet 
{
    private byte[] bytes;

    private String ipAddress;


public Packet(final byte[] bytes, final String ipAddress)
{
    this.bytes = bytes;

    this.ipAddress = ipAddress;
}

public byte[] getBytes() 
{
    return bytes;
}

public String getipAddress() 
{
    return ipAddress;
}
}



public class PacketQueue extends LinkedBlockingQueue
{
    private static volatile PacketQueue INSTANCE = null;

    public static synchronized final PacketQueue getInstance() 
    {
        if (INSTANCE == null)
            INSTANCE = new PacketQueue(); 

        return INSTANCE;
    }
}


public class Worker implements Runnable
{

private LinkedBlockingQueue lbq;

private volatile boolean running;

public Worker()
{
    lbq = PacketQueue.getInstance();

    running = true;
}


public void run() 
{
    System.out.println("Thread Starting Up . . .");

    while (running) 
    {
        synchronized (lbq) 
        {
            while (lbq.empty()) 
            {
                try 
                {
                    lbq.wait();

                    if (lbq == null)
                        return;
                }
                catch (InterruptedException e) 
                {                       
                    return;
                }
            }
        }

        Packet packet = (Packet)lbq.poll();
        if (packet != null) 
        {
            String ipAddr = new String (packet.getipAddress());             
            String payLoad = new String (packet.getBytes());

            String strRecv = new String( "IP Address from Worker:[" + ipAddr + "], Payload :[" + payLoad + "]");
            System.out.println(strRecv);                
        }
    }
}   

/**
 * Shutdown current thread
 */
public void shutdown()
{
    running = false;

    if (lbq != null)
    {
        synchronized (lbq) 
        {
            lbq.notifyAll();
            lbq = null;
        }
    }
}
}

So if I am comparing both the print statements in NetworkListener.java and Worker.java, I would expect them to be the same ip address for a given payload but in my case sometimes its giving me correct ip address sometimes its giving an incorrect ip address. At every sec, I am seeing around 35 more data packets to be processed in the queue.

What am I doing wrong???

Is my code not able to handle heavy traffic?? Should I be using any other data structure apart from LinkedBlockingQueue?? Or should I not convert from DatagramPacket to Packet object??

Please advise and thanks for looking.

1
I think you may be misinterpreting the output from Wireshark. Your Java application is likely not the only thing generating activity. - Tim Biegeleisen
no I did verify the wireshark and made sure that data is correct since we are getting the data from another network and even that wireshark capture shows the same. The data related to IP address is also correct so its in the code. - Ikshvak
start with adding actual exception handlers. Right now, exceptions get silently discarded, so errors in packet handling might cause your application to miss some. You might even be thinking that you're watching this exact packet when you really aren't. And, BTW, your code snipped should probably be more... minimal. - Jan
I did not include the logging of exceptions here....But I do have logging for those and I am not seeing any. - Ikshvak

1 Answers

0
votes

Your problem could be a race condition. Maybe other problems could be but I cannot say.

Just by looking at your code. Shouldn't this

Packet packet = (Packet)lbq.poll();

be part of the critical section and have synchronized access? This is the only access to lbq that is not protected. Is seems it should be within the synchronized section after the while loop.