I have two threads that I'm dealing with Java NIO for non-blocking sockets. This is what the threads are doing:
Thread 1: A loop that calls on the select() method of a selector. If any keys are available, they are processed accordingly.
Thread 2: Occasionally registers a SocketChannel to the selector by calling register().
The problem is, unless the timeout for select() is very small (like around 100ms), the call to register() will block indefinitely. Even though the channel is configured to be nonblocking, and the javadocs state that the Selector object is thread safe (but it's selection keys are not, I know).
So anyone have any ideas on what the issue could be? The application works perfectly if I put everything in one thread. No problems occur then, but I'd really like to have separate threads. Any help is appreciated. I've posted my example code below:
Change the select(1000) to select(100) and it'll work. Leave it as select() or select(1000) and it won't.
import java.io.IOException;
import java.net.DatagramSocket;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.DatagramChannel;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.util.Iterator;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class UDPSocket
{
private DatagramChannel clientChannel;
private String dstHost;
private int dstPort;
private static Selector recvSelector;
private static volatile boolean initialized;
private static ExecutorService eventQueue = Executors.newSingleThreadExecutor();
public static void init()
{
initialized = true;
try
{
recvSelector = Selector.open();
}
catch (IOException e)
{
System.err.println(e);
}
Thread t = new Thread(new Runnable()
{
@Override
public void run()
{
while(initialized)
{
readData();
Thread.yield();
}
}
});
t.start();
}
public static void shutdown()
{
initialized = false;
}
private static void readData()
{
try
{
int numKeys = recvSelector.select(1000);
if (numKeys > 0)
{
Iterator i = recvSelector.selectedKeys().iterator();
while(i.hasNext())
{
SelectionKey key = i.next();
i.remove();
if (key.isValid() && key.isReadable())
{
DatagramChannel channel = (DatagramChannel) key.channel();
// allocate every time we receive so that it's a copy that won't get erased
final ByteBuffer buffer = ByteBuffer.allocate(Short.MAX_VALUE);
channel.receive(buffer);
buffer.flip();
final SocketSubscriber subscriber = (SocketSubscriber) key.attachment();
// let user handle event on a dedicated thread
eventQueue.execute(new Runnable()
{
@Override
public void run()
{
subscriber.onData(buffer);
}
});
}
}
}
}
catch (IOException e)
{
System.err.println(e);
}
}
public UDPSocket(String dstHost, int dstPort)
{
try
{
this.dstHost = dstHost;
this.dstPort = dstPort;
clientChannel = DatagramChannel.open();
clientChannel.configureBlocking(false);
}
catch (IOException e)
{
System.err.println(e);
}
}
public void addListener(SocketSubscriber subscriber)
{
try
{
DatagramChannel serverChannel = DatagramChannel.open();
serverChannel.configureBlocking(false);
DatagramSocket socket = serverChannel.socket();
socket.bind(new InetSocketAddress(dstPort));
SelectionKey key = serverChannel.register(recvSelector, SelectionKey.OP_READ);
key.attach(subscriber);
}
catch (IOException e)
{
System.err.println(e);
}
}
public void send(ByteBuffer buffer)
{
try
{
clientChannel.send(buffer, new InetSocketAddress(dstHost, dstPort));
}
catch (IOException e)
{
System.err.println(e);
}
}
public void close()
{
try
{
clientChannel.close();
}
catch (IOException e)
{
System.err.println(e);
}
}
}
import java.nio.ByteBuffer;
public interface SocketSubscriber
{
public void onData(ByteBuffer data);
}
Example usage:
public class Test implements SocketSubscriber
{
public static void main(String[] args) throws Exception
{
UDPSocket.init();
UDPSocket test = new UDPSocket("localhost", 1234);
test.addListener(new Test());
UDPSocket test2 = new UDPSocket("localhost", 4321);
test2.addListener(new Test());
System.out.println("Listening...");
ByteBuffer buffer = ByteBuffer.allocate(500);
test.send(buffer);
buffer.rewind();
test2.send(buffer);
System.out.println("Data sent...");
Thread.sleep(5000);
UDPSocket.shutdown();
}
@Override
public void onData(ByteBuffer data)
{
System.out.println("Received " + data.limit() + " bytes of data.");
}
}