2
votes

I am using datastax java driver 3.1.0 to connect to cassandra cluster and my cassandra cluster version is 2.0.10. I am writing asynchronously with QUORUM consistency.

  public void save(final String process, final int clientid, final long deviceid) {
    String sql = "insert into storage (process, clientid, deviceid) values (?, ?, ?)";
    try {
      BoundStatement bs = CacheStatement.getInstance().getStatement(sql);
      bs.setConsistencyLevel(ConsistencyLevel.QUORUM);
      bs.setString(0, process);
      bs.setInt(1, clientid);
      bs.setLong(2, deviceid);

      ResultSetFuture future = session.executeAsync(bs);
      Futures.addCallback(future, new FutureCallback<ResultSet>() {
        @Override
        public void onSuccess(ResultSet result) {
          logger.logInfo("successfully written");
        }

        @Override
        public void onFailure(Throwable t) {
          logger.logError("error= ", t);
        }
      }, Executors.newFixedThreadPool(10));
    } catch (Exception ex) {
      logger.logError("error= ", ex);
    }
  }

And below is my CacheStatement class:

public class CacheStatement {
  private static final Map<String, PreparedStatement> cache =
      new ConcurrentHashMap<>();

  private static class Holder {
    private static final CacheStatement INSTANCE = new CacheStatement();
  }

  public static CacheStatement getInstance() {
    return Holder.INSTANCE;
  }

  private CacheStatement() {}

  public BoundStatement getStatement(String cql) {
    Session session = CassUtils.getInstance().getSession();
    PreparedStatement ps = cache.get(cql);
    // no statement cached, create one and cache it now.
    if (ps == null) {
      synchronized (this) {
        ps = cache.get(cql);
        if (ps == null) {
          cache.put(cql, session.prepare(cql));
        }
      }
    }
    return ps.bind();
  }
}

My above save method will be called from multiple threads and I think BoundStatement is not thread safe. Btw StatementCache class is thread safe as shown above.

  • Since BoundStatement is not thread safe. Will there be any problem in my above code if I write asynchronously from multiple threads?
  • And secondly, I am using Executors.newFixedThreadPool(10) in the addCallback parameter. Is this ok or there will be any problem? Or should I use MoreExecutors.directExecutor. What is the difference between these two then? And what is the best way for this?

Below is my connection setting to connect to cassandra using datastax java driver:

Builder builder = Cluster.builder();
    cluster =
        builder
            .addContactPoints(servers.toArray(new String[servers.size()]))
            .withRetryPolicy(new LoggingRetryPolicy(DowngradingConsistencyRetryPolicy.INSTANCE))
            .withPoolingOptions(poolingOptions)
            .withReconnectionPolicy(new ConstantReconnectionPolicy(100L))
            .withLoadBalancingPolicy(
                DCAwareRoundRobinPolicy
                    .builder()
                    .withLocalDc(
                        !TestUtils.isProd() ? "DC2" : TestUtils.getCurrentLocation()
                            .get().name().toLowerCase()).withUsedHostsPerRemoteDc(3).build())
            .withCredentials(username, password).build();
1
your creating a new thread pool every time save is called, instead make a static or instance version of the thread pool and reuse it.Chris Lohfink
Yeah I did that after reading below answer. I have declared it as final at the top of the class and then using it.. In general what is the difference between MoreExecutors.directExecutor() vs threadpool?john
@ChrisLohfink Is there any real benefit of using MoreExecutors.directExecutor() vs threadpool in the callback? Can you help me understand this?john

1 Answers

1
votes

I think what you're doing is fine. You could optimize a bit further by preparing all the statements at application startup, so you have everything already cached, so you don't get any performance hit for preparing statement when "saving", and you don't lock anything in your workflow.

BoundStatement is not threadsafe, but PreparedStatement yes, and you are returning a new BoundStatement every time you call your getStatement. Indeed, the .bind() function of the PreparedStatement is actually a shortcut for new BoundStatement(ps).bind(). And you are not accessing the same BoundStatement from multiple thread. So your code is fine.

For thread pool, instead, you are actually creating a new thread pool on each addCallback function. This is a waste of resources. I don't use this callback method and I prefer managing plain FutureResultSet by myself, but I saw examples on datastax documentation that use MoreExecutors.sameThreadExecutor() instead of MoreExecutors.directExecutor().