3
votes

I am trying to insert data from storm to cassandra. It is of string type initially.

My java class has following code:

String insertQuery1
            = "insert into fault.as_fo_ag_uc ("
            + "host,"
            + "trigger,"
            + "eventtime,uuiddefault) "
            + "values(?,?,?,?)";    

BoundStatement boundStatement = new BoundStatement(statement);
boundStatement.setString(1, dto.getHost());
boundStatement.setString(2, dto.getTrigger());
Timestamp ts = Timestamp.valueOf(dto.getEventTime());
boundStatement.setDate(3, ts);
boundStatement.setString(4, dto.getUIDDefault());

Here eI get error that eventtime is of type timestamp. As I have converted to timestamp but it is notworking. I also have tried using Simple Date Formater to format it into date type

DateFormat df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss,z");
boundStatement.setDate(3, new java.sql.Date(df.parse(dto.getEventTime)).getTime()));

The error is "EventTime is of type timestamp". My cassandra driver version is 2.1.7.

com.datastax.driver.core.exceptions.InvalidTypeException: Value eventime is of type timestamp at com.datastax.driver.core.AbstractGettableByIndexData.checkType(AbstractGettableByIndexData.java:89) ~[cassandra-driver-core-2.1.7.1.jar:?] at com.datastax.driver.core.AbstractData.setString(AbstractData.java:157) ~[cassandra-driver-core-2.1.7.1.jar:?] at com.datastax.driver.core.BoundStatement.setString(BoundStatement.java:499) ~[cassandra-driver-core-2.1.7.1.jar:?] at storm.starter.bolt.CassandraWriterBolt.execute(CassandraWriterBolt.java:219) [classes/:?] at backtype.storm.topology.BasicBoltExecutor.execute(BasicBoltExecutor.java:50) [storm-core-0.10.0.jar:0.10.0] at backtype.storm.daemon.executor$fn__5694$tuple_action_fn__5696.invoke(executor.clj:690) [storm-core-0.10.0.jar:0.10.0] at backtype.storm.daemon.executor$mk_task_receiver$fn__5615.invoke(executor.clj:436) [storm-core-0.10.0.jar:0.10.0] at backtype.storm.disruptor$clojure_handler$reify__5189.onEvent(disruptor.clj:58) [storm-core-0.10.0.jar:0.10.0] at backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:127) [storm-core-0.10.0.jar:0.10.0] at backtype.storm.utils.DisruptorQueue.consumeBatchWhenAvailable(DisruptorQueue.java:106) [storm-core-0.10.0.jar:0.10.0] at backtype.storm.disruptor$consume_batch_when_available.invoke(disruptor.clj:80) [storm-core-0.10.0.jar:0.10.0] at backtype.storm.daemon.executor$fn__5694$fn__5707$fn__5758.invoke(executor.clj:819) [storm-core-0.10.0.jar:0.10.0] at backtype.storm.util$async_loop$fn__545.invoke(util.clj:479) [storm-core-0.10.0.jar:0.10.0] at clojure.lang.AFn.run(AFn.java:22) [clojure-1.6.0.jar:?] at java.lang.Thread.run(Thread.java:745) [?:1.7.0_79]

2
Please post the full stacktrace.Sotirios Delimanolis
It's also unclear why you are creating Timestamp and java.sql.Date objects when setDate simply expects a java.util.Date.Sotirios Delimanolis
@SotiriosDelimanolis java 2.1.7 has not setTimestamp functionkinkajou
What? Why are you talking about a setTimestamp method?Sotirios Delimanolis
@SotiriosDelimanolis I have declared my column as eventtime timestamp,kinkajou

2 Answers

2
votes

The setXyz methods in BoundStatement that expect an int argument interpret that value as an index starting at 0.

So your

boundStatement.setString(2, dto.getTrigger());

is actually trying to set the placeholder for the eventtime column in the CQL query.

Change your statement to start at 0 and go up to 3.

BoundStatement boundStatement = new BoundStatement(statement);
boundStatement.setString(0, dto.getHost());
boundStatement.setString(1, dto.getTrigger());
Timestamp ts = Timestamp.valueOf(dto.getEventTime());
boundStatement.setDate(2, ts);
boundStatement.setString(3, dto.getUIDDefault());
0
votes

If you want to or have to use QueryBuilder in the Java driver, you can call it using something like this:

session.execute(insertInto("table_name")
  .value("userId", literal("123"))
  .value("created_at", function("toTimestamp", function("now")))
).toCql());

(Assuming Cassandra 2.2 or later, for support to use toTimestamp; see docs on 2.2)