I need to write a storm spout for reading data from a port. Wanted to know if that was logically possible.
With that in mind, I had designed a simple topology designed for the same with one spout and one bolt. The spout would gather HTTP requests sent using wget and the bolt would display the request-Just that.
My spout structure is as follows:
public class ProxySpout extends BaseRichSpout{
//The O/P collector
SpoutOutputCollector sc;
//The socket
Socket clientSocket;
//The server socket
ServerSocket sc;
public ProxySpout(int port){
this.sc=new ServerSocket(port);
try{
clientSocket=sc.accept();
}catch(IOException ex){
//Handle it
}
}
public void nextTuple(){
try{
InputStream ic=clientSocket.getInputStream();
byte b=new byte[8196];
int len=ic.read(b);
sc.emit(new Values(b));
ic.close();
}catch(//){
//Handle it
}finally{
clientSocket.close();
}
}
}
I have implemented the rest of the methods too.
When I turn this into a topology and run it, I get an error when I send the first request:
java.lang.RuntimeException:java.io.NotSerializableException:java.net.Socket
Just need to know if there is something wrong with the way I am implementing this spout. Is it even possible for a spout to collect data from a port? Or for a spout to act as an instance of a proxy?
Edit
Got it working.
The code is:
public class ProxySpout extends BaseRichSpout{
//The O/P collector
static SpoutOutputCollector _collector;
//The socket
static Socket _clientSocket;
static ServerSocket _serverSocket;
static int _port;
public ProxySpout(int port){
_port=port;
}
public void open(Map conf,TopologyContext context, SpoutOutputCollector collector){
_collector=collector;
_serverSocket=new ServerSocket(_port);
}
public void nextTuple(){
_clientSocket=_serverSocket.accept();
InputStream incomingIS=_clientSocket.getInputStream();
byte[] b=new byte[8196];
int len=b.incomingIS.read(b);
_collector.emit(new Values(b));
}
}
As per @Shaw's suggestion, tried initializing _serverSocket
in the open()
method and the _clientSocket
runs in nextTuple()
method for listening to requests.
Dunno the performance metrices of this one, but it works..:-)