2
votes

Storm Topology reads data from kafka and write into cassandra tables

In Storm i am creating cassandra cluster connection and session in prepare method.

cassandraCluster = Cluster.builder().withoutJMXReporting().withoutMetrics()
            .addContactPoints(nodes)
            .withRetryPolicy(DowngradingConsistencyRetryPolicy.INSTANCE)
            .withReconnectionPolicy(new ExponentialReconnectionPolicy(100L,
                    TimeUnit.MINUTES.toMillis(5)))
            .withLoadBalancingPolicy(
                    new TokenAwarePolicy(new RoundRobinPolicy()))
            .build();

session = cassandraCluster.connect(keyspace);

In execute method i can process the tuple and save it in cassandra table

Suppose if i want to write data from single tuple into multiple table Writing separate bolt for each table will be good choice. But i have to create cluster connection and session each table in each bolt.

But in this link single connection per cluster will be a good idea for performance http://www.datastax.com/dev/blog/4-simple-rules-when-using-the-datastax-drivers-for-cassandra

Did any of you have any idea on creating cluster connection in one bolt and use this connection in other bolt?

1
I dont know enought about apache storm to comment here but a quick look at the docs seems to show that you carry out discreet operations in "bolts". You might want to explain a bit more about this in your question for users of Cassandra and the drivers. It is correct you need to ideally keep the one session open. If you can share objects between each botl then could you not make your session object a public object shared across each "bolt"? - markc
@markc it's not really possible to share objects across bolts because bolts are distributed across physically separate machines. The best one can do here is maintain one cluster/session per bolt but that seems to contradict the best practices described in the link. I don't know enough about Cassandra to say if that's ok or not. - Ryan Walker
@RyanWalker right ok thanks for clearing that up. Makes sense. It would probably make sense to create a session per bolt then. A cassandra cluster can have many clients connected however the reason its advised to keep one session open is just to avoid setting up and tearing down the connections. As long as the bolt itself is persistent then the session object can be a child of that I would say. Once a connection is established the driver will provide a connection pool into the cluster. See: github.com/datastax/java-driver/tree/3.x/manual/pooling - markc

1 Answers

0
votes

It depends on how storm allocates the bolts and spouts to the workers. You can't assume that you can can share connections between bolts because they might be running in different workers (read: JVMs) or on different nodes entirely.

See my answer here: Mongo connection pooling for Storm topology

Might look something like this pseudocode:

public class CassandraBolt extends BaseRichBolt {
    private static final long serialVersionUID = 1L;
    private static Logger LOG = LoggerFactory.getLogger(CassandraBolt.class);
   OutputCollector _collector;

   // whatever your cassandra session is
   // has to be transient because session is not serializable
   protected transient CassandraSession _session;

    @SuppressWarnings("rawtypes")
    @Override
    public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
        _collector = collector;

      // maybe get properties from stormConf instead of hard coding them
        cassandraCluster = Cluster.builder().withoutJMXReporting().withoutMetrics()
            .addContactPoints(nodes)
            .withRetryPolicy(DowngradingConsistencyRetryPolicy.INSTANCE)
            .withReconnectionPolicy(new ExponentialReconnectionPolicy(100L,
                    TimeUnit.MINUTES.toMillis(5)))
            .withLoadBalancingPolicy(
                    new TokenAwarePolicy(new RoundRobinPolicy()))
            .build();

      _session = cassandraCluster.connect(keyspace);
    }

    @Override
    public void execute(Tuple input) {
        try {
            // use _session to talk to cassandra

        } catch (Exception e) {
            LOG.error("CassandraBolt error", e);
            _collector.reportError(e);
        }   
    }


    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        // TODO Auto-generated method stub
    }
}