I'm trying to integrate Storm (see here) into my project. I grok the concepts of topologies, spouts, and bolts. But now, I'm trying to figure out the actual implementation of a few things.
A) I have a polyglot environment with Java and Clojure. My Java code is a callback class with methods firing streaming data. The event data pushed to those methods, is what I want to use as a spout.
So the first question is how to connect the data coming into those methods, to a spout ? I'm trying to i) pass an backtype.storm.topology.IRichSpout , then ii) pass a backtype.storm.spout.SpoutOutputCollector (see here) to that spout's open function (see here). But I can't see a way to actually pass in any kind of map or list.
B) The rest of my project is all Clojure. There will be a lot of data coming through those methods. Each event will have an ID of between 1 and 100. In Clojure, I'll want to split data coming from the spout, into different threads of execution. Those, I think, will be the bolts.
How can I set up a Clojure bolt to take event data from the spout, then break-off a thread based on the ID of the incoming event ?
Thanks in advance Tim
[EDIT 1]
I've actually gotten past this problem. I ended up 1) implementing my own IRichSpout. I then 2) connected that spout's internal tuple to the incoming stream data in my java callback class. I'm not sure if this is idiomatic. But it compiles and runs without error. However, 3) I don't see the incoming stream data (definitely there), coming through the printstuff bolt.
In order to ensure that the event data gets propagated, is there something specific I have to do in the spout or bolt implementation or topology definition? Thanks.
;; tie Java callbacks to a Spout that I created (.setSpout java-callback ibspout) (storm/defbolt printstuff ["word"] [tuple collector] (println (str "printstuff --> tuple["tuple"] > collector["collector"]")) ) (storm/topology { "1" (storm/spout-spec ibspout) } { "3" (storm/bolt-spec { "1" :shuffle } printstuff ) })
[EDIT 2]
On the advice of SO member Ankur, I'm rejigging my topology. After I've created my Java callback, I pass it's tuple to the below IBSpout, using (.setTuple ibspout (.getTuple java-callback))
. I don't pass the entire Java callback object, because I get a NotSerializable error. Everything compiles and runs without error. But again, there's no data coming to my printstuff bolt. Hmmm.
public class IBSpout implements IRichSpout { /** * Storm spout stuff */ private SpoutOutputCollector _collector; private List _tuple = new ArrayList(); public void setTuple(List tuple) { _tuple = tuple; } public List getTuple() { return _tuple; } /** * Storm ISpout interface functions */ public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) { _collector = collector; } public void close() {} public void activate() {} public void deactivate() {} public void nextTuple() { _collector.emit(_tuple); } public void ack(Object msgId) {} public void fail(Object msgId) {} public void declareOutputFields(OutputFieldsDeclarer declarer) {} public java.util.Map getComponentConfiguration() { return new HashMap(); } }