0
votes

Hi am building a Peer2Peer network using UDP socket each time the server receives a message from the client it create a thread to deal with the request.am trying to implement a ThreadPool Queue to store the request and limit the number of thread running on the machine and whenever a thread is free we get threads from the queue (Producer/Consumer),i have been trying for a while now but the new threads don't get added on the queue i end executing only the first thread that was added.

 
    public void run() {

    BlockingQueue<Runnable> queue=new LinkedBlockingQueue<Runnable>(10);

    ExecutorService tpes =Executors.newFixedThreadPool(10);
    int server_port = 8767;
    DatagramSocket s = null;
    while(true){
        byte[] message = new byte[1024];
        DatagramPacket p = new DatagramPacket(message, message.length);

        try{
            s = new DatagramSocket(server_port);
        }catch (SocketException e) {
            e.printStackTrace();
            System.out.println("Socket excep");
        }

        try {
            s.receive(p);
            queue.put(new RequestHandler(p));
            tpes.execute(queue.take());

        }catch (Exception e) {
            e.printStackTrace();
            System.out.println("IO EXcept");
        }
        finally{
            s.close();
        }
    }
}

I don't know where am going wrong ? any help would be appreciated

1
Is that run method the one of RequestHandler? You create strange recursion. Also if you .take() the object you've just put there a few lines earlier you could just use it directly.zapl
the run method is not of the RequestHandler it's a part of the peerNode that can act like a server in the run i listen to request and i create thread and the RequestHandler class takes care of the rest. I don't want to use the object directly because i'll be creating to many threads (1 thread per request) i want to use a queue and store the threads and use a fixed number of threads.Mero

1 Answers

2
votes

You don't need a Queue of your own. An ExecutorService has a queue for tasks. I would also suggest not to open a new socket each time since you can re-use them.

The resulting code might look roughly like this.

public void run() {
    ExecutorService tpes = Executors.newFixedThreadPool(10);
    int server_port = 8767;
    DatagramSocket s = null;
    try {
        s = new DatagramSocket(server_port);
        while (true) {
            byte[] message = new byte[1024];
            DatagramPacket p = new DatagramPacket(message, message.length);
            try {
                s.receive(p);
                tpes.execute(new RequestHandler(p));
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    } catch (SocketException e) {
        e.printStackTrace();
        System.out.println("Socket excep");
    } finally {
        if (s != null) s.close();
    }
}

The RequestHandler will run on one of 10 threads and if you get more requests than 10 threads can handle you will have a queue of pending RequestHandler tasks within the Executor.

Note: I don't think it makes sense to run RequestHandler on a thread pool. Handling 1kB of data is not much and e.g. writing that data to the harddrive is faster than you can recieve packets. (unless you are on fibre and your harddisk is terribly slow).