1
votes

I've been struggling the all day and didn't find a solution.

I'm trying to connect a remote Cassandra node via a Spark Streaming application, using the spark-cassandra connector and the application exists with an exception. Any help would be much appreciated.

2015-02-17 19:13:58 DEBUG Connection:114 - Connection[/<MY_PUBLIC_IP>:9042-2, inFlight=0, closed=false] Transport initialized and ready
2015-02-17 19:13:58 DEBUG ControlConnection:492 - [Control connection] Refreshing node list and token map
2015-02-17 19:13:59 DEBUG ControlConnection:262 - [Control connection] Refreshing schema
2015-02-17 19:14:00 DEBUG ControlConnection:492 - [Control connection] Refreshing node list and token map
2015-02-17 19:14:00 DEBUG ControlConnection:172 - [Control connection] Successfully connected to /<MY_PUBLIC_IP>:9042
2015-02-17 19:14:00 INFO  Cluster:1267 - New Cassandra host /<MY_PUBLIC_IP>:9042 added
2015-02-17 19:14:00 INFO  CassandraConnector:51 - Connected to Cassandra cluster: Test Cluster
2015-02-17 19:14:00 INFO  LocalNodeFirstLoadBalancingPolicy:59 - Adding host <MY_PUBLIC_IP> (datacenter1)
2015-02-17 19:14:01 DEBUG Connection:114 - Connection[/<MY_PUBLIC_IP>:9042-3, inFlight=0, closed=false] Transport initialized and ready
2015-02-17 19:14:01 DEBUG Session:304 - Added connection pool for /<MY_PUBLIC_IP>:9042
2015-02-17 19:14:01 INFO  LocalNodeFirstLoadBalancingPolicy:59 - Adding host <MY_PUBLIC_IP> (datacenter1)
2015-02-17 19:14:01 DEBUG Schema:55 - Retrieving database schema from cluster Test Cluster...
2015-02-17 19:14:01 DEBUG Schema:55 - 1 keyspaces fetched from cluster Test Cluster: {vehicles}
2015-02-17 19:14:02 DEBUG CassandraConnector:55 - Attempting to open thrift connection to Cassandra at <MY_PUBLIC_IP>:9160
2015-02-17 19:14:02 DEBUG Connection:428 - Connection[/<MY_PUBLIC_IP>:9042-3, inFlight=0, closed=true] closing connection
2015-02-17 19:14:02 DEBUG Cluster:1340 - Shutting down
2015-02-17 19:14:02 DEBUG Connection:428 - Connection[/<MY_PUBLIC_IP>:9042-2, inFlight=0, closed=true] closing connection
2015-02-17 19:14:02 INFO  CassandraConnector:51 - Disconnected from Cassandra cluster: Test Cluster
2015-02-17 19:14:03 DEBUG CassandraConnector:55 - Attempting to open thrift connection to Cassandra at <AWS_LOCAL_IP>:9160
2015-02-17 19:14:10 DEBUG HeartbeatReceiver:50 - [actor] received message Heartbeat(localhost,[Lscala.Tuple2;@77008370,BlockManagerId(<driver>, Alon-PC, 62343, 0)) from Actor[akka://sparkDriver/temp/$b]
2015-02-17 19:14:10 DEBUG BlockManagerMasterActor:50 - [actor] received message BlockManagerHeartbeat(BlockManagerId(<driver>, Alon-PC, 62343, 0)) from Actor[akka://sparkDriver/temp/$c]
2015-02-17 19:14:10 DEBUG BlockManagerMasterActor:56 - [actor] handled message (0.491517 ms) BlockManagerHeartbeat(BlockManagerId(<driver>, Alon-PC, 62343, 0)) from Actor[akka://sparkDriver/temp/$c]
2015-02-17 19:14:10 DEBUG HeartbeatReceiver:56 - [actor] handled message (69.725123 ms) Heartbeat(localhost,[Lscala.Tuple2;@77008370,BlockManagerId(<driver>, Alon-PC, 62343, 0)) from Actor[akka://sparkDriver/temp/$b]
2015-02-17 19:14:20 DEBUG HeartbeatReceiver:50 - [actor] received message Heartbeat(localhost,[Lscala.Tuple2;@70a7cd6e,BlockManagerId(<driver>, Alon-PC, 62343, 0)) from Actor[akka://sparkDriver/temp/$d]
2015-02-17 19:14:20 DEBUG BlockManagerMasterActor:50 - [actor] received message BlockManagerHeartbeat(BlockManagerId(<driver>, Alon-PC, 62343, 0)) from Actor[akka://sparkDriver/temp/$e]
2015-02-17 19:14:20 DEBUG BlockManagerMasterActor:56 - [actor] handled message (0.348586 ms) BlockManagerHeartbeat(BlockManagerId(<driver>, Alon-PC, 62343, 0)) from Actor[akka://sparkDriver/temp/$e]
2015-02-17 19:14:20 DEBUG HeartbeatReceiver:56 - [actor] handled message (2.020429 ms) Heartbeat(localhost,[Lscala.Tuple2;@70a7cd6e,BlockManagerId(<driver>, Alon-PC, 62343, 0)) from Actor[akka://sparkDriver/temp/$d]
2015-02-17 19:14:24 ERROR ServerSideTokenRangeSplitter:88 - Failure while fetching splits from Cassandra
java.io.IOException: Failed to open thrift connection to Cassandra at <AWS_LOCAL_IP>:9160
    at com.datastax.spark.connector.cql.CassandraConnector.createThriftClient(CassandraConnector.scala:132)
    at com.datastax.spark.connector.cql.CassandraConnector.withCassandraClientDo(CassandraConnector.scala:141)
    at com.datastax.spark.connector.rdd.partitioner.ServerSideTokenRangeSplitter.com$datastax$spark$connector$rdd$partitioner$ServerSideTokenRangeSplitter$$fetchSplits(ServerSideTokenRangeSplitter.scala:33)
    at com.datastax.spark.connector.rdd.partitioner.ServerSideTokenRangeSplitter$$anonfun$1$$anonfun$apply$2.apply(ServerSideTokenRangeSplitter.scala:45)
    at com.datastax.spark.connector.rdd.partitioner.ServerSideTokenRangeSplitter$$anonfun$1$$anonfun$apply$2.apply(ServerSideTokenRangeSplitter.scala:45)
    at scala.util.Try$.apply(Try.scala:161)
    at com.datastax.spark.connector.rdd.partitioner.ServerSideTokenRangeSplitter$$anonfun$1.apply(ServerSideTokenRangeSplitter.scala:45)
    at com.datastax.spark.connector.rdd.partitioner.ServerSideTokenRangeSplitter$$anonfun$1.apply(ServerSideTokenRangeSplitter.scala:44)
    at scala.collection.immutable.Stream.map(Stream.scala:376)
    at com.datastax.spark.connector.rdd.partitioner.ServerSideTokenRangeSplitter.split(ServerSideTokenRangeSplitter.scala:44)
    at com.datastax.spark.connector.rdd.partitioner.CassandraRDDPartitioner$$anonfun$com$datastax$spark$connector$rdd$partitioner$CassandraRDDPartitioner$$splitsOf$1.apply(CassandraRDDPartitioner.scala:77)
    at com.datastax.spark.connector.rdd.partitioner.CassandraRDDPartitioner$$anonfun$com$datastax$spark$connector$rdd$partitioner$CassandraRDDPartitioner$$splitsOf$1.apply(CassandraRDDPartitioner.scala:76)
    at scala.collection.parallel.mutable.ParArray$ParArrayIterator.flatmap2combiner(ParArray.scala:418)
    at scala.collection.parallel.ParIterableLike$FlatMap.leaf(ParIterableLike.scala:1075)
    at scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply$mcV$sp(Tasks.scala:54)
    at scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply(Tasks.scala:53)
    at scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply(Tasks.scala:53)
    at scala.collection.parallel.Task$class.tryLeaf(Tasks.scala:56)
    at scala.collection.parallel.ParIterableLike$FlatMap.tryLeaf(ParIterableLike.scala:1071)
    at scala.collection.parallel.AdaptiveWorkStealingTasks$WrappedTask$class.compute(Tasks.scala:165)
    at scala.collection.parallel.AdaptiveWorkStealingForkJoinTasks$WrappedTask.compute(Tasks.scala:514)
    at scala.concurrent.forkjoin.RecursiveAction.exec(RecursiveAction.java:160)
    at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
    at scala.concurrent.forkjoin.ForkJoinTask.doJoin(ForkJoinTask.java:341)
    at scala.concurrent.forkjoin.ForkJoinTask.join(ForkJoinTask.java:673)
    at scala.collection.parallel.ForkJoinTasks$WrappedTask$class.sync(Tasks.scala:444)
    at scala.collection.parallel.AdaptiveWorkStealingForkJoinTasks$WrappedTask.sync(Tasks.scala:514)
    at scala.collection.parallel.ForkJoinTasks$class.executeAndWaitResult(Tasks.scala:492)
    at scala.collection.parallel.ForkJoinTaskSupport.executeAndWaitResult(TaskSupport.scala:64)
    at scala.collection.parallel.ParIterableLike$ResultMapping.leaf(ParIterableLike.scala:961)
    at scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply$mcV$sp(Tasks.scala:54)
    at scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply(Tasks.scala:53)
    at scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply(Tasks.scala:53)
    at scala.collection.parallel.Task$class.tryLeaf(Tasks.scala:56)
    at scala.collection.parallel.ParIterableLike$ResultMapping.tryLeaf(ParIterableLike.scala:956)
    at scala.collection.parallel.AdaptiveWorkStealingTasks$WrappedTask$class.compute(Tasks.scala:165)
    at scala.collection.parallel.AdaptiveWorkStealingForkJoinTasks$WrappedTask.compute(Tasks.scala:514)
    at scala.concurrent.forkjoin.RecursiveAction.exec(RecursiveAction.java:160)
    at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
    at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
    at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
    at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: org.apache.thrift.transport.TTransportException: java.net.ConnectException: Connection timed out: connect
    at org.apache.thrift.transport.TSocket.open(TSocket.java:185)
    at org.apache.thrift.transport.TFramedTransport.open(TFramedTransport.java:81)
    at org.apache.cassandra.thrift.TFramedTransportFactory.openTransport(TFramedTransportFactory.java:41)
    at com.datastax.spark.connector.cql.DefaultConnectionFactory$.createThriftClient(CassandraConnectionFactory.scala:47)
    at com.datastax.spark.connector.cql.CassandraConnector.createThriftClient(CassandraConnector.scala:127)
    ... 41 more
Caused by: java.net.ConnectException: Connection timed out: connect
    at java.net.DualStackPlainSocketImpl.connect0(Native Method)
    at java.net.DualStackPlainSocketImpl.socketConnect(DualStackPlainSocketImpl.java:79)
    at java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:339)
    at java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:200)
    at java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:182)
    at java.net.PlainSocketImpl.connect(PlainSocketImpl.java:172)
    at java.net.SocksSocketImpl.connect(SocksSocketImpl.java:392)
    at java.net.Socket.connect(Socket.java:579)
    at org.apache.thrift.transport.TSocket.open(TSocket.java:180)
    ... 45 more
Exception in thread "main" java.io.IOException: Failed to fetch splits of TokenRange(0,0,Set(CassandraNode(/<AWS_LOCAL_IP>,/<MY_PUBLIC_IP>)),None) from all endpoints: CassandraNode(/<AWS_LOCAL_IP>,/<MY_PUBLIC_IP>)
    at com.datastax.spark.connector.rdd.partitioner.ServerSideTokenRangeSplitter$$anonfun$split$2.apply(ServerSideTokenRangeSplitter.scala:55)
    at com.datastax.spark.connector.rdd.partitioner.ServerSideTokenRangeSplitter$$anonfun$split$2.apply(ServerSideTokenRangeSplitter.scala:49)
    at scala.Option.getOrElse(Option.scala:120)
    at com.datastax.spark.connector.rdd.partitioner.ServerSideTokenRangeSplitter.split(ServerSideTokenRangeSplitter.scala:49)
    at com.datastax.spark.connector.rdd.partitioner.CassandraRDDPartitioner$$anonfun$com$datastax$spark$connector$rdd$partitioner$CassandraRDDPartitioner$$splitsOf$1.apply(CassandraRDDPartitioner.scala:77)
    at com.datastax.spark.connector.rdd.partitioner.CassandraRDDPartitioner$$anonfun$com$datastax$spark$connector$rdd$partitioner$CassandraRDDPartitioner$$splitsOf$1.apply(CassandraRDDPartitioner.scala:76)
    at scala.collection.parallel.mutable.ParArray$ParArrayIterator.flatmap2combiner(ParArray.scala:418)
    at scala.collection.parallel.ParIterableLike$FlatMap.leaf(ParIterableLike.scala:1075)
    at scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply$mcV$sp(Tasks.scala:54)
    at scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply(Tasks.scala:53)
    at scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply(Tasks.scala:53)
    at scala.collection.parallel.Task$class.tryLeaf(Tasks.scala:56)
    at scala.collection.parallel.ParIterableLike$FlatMap.tryLeaf(ParIterableLike.scala:1071)
    at scala.collection.parallel.AdaptiveWorkStealingTasks$WrappedTask$class.compute(Tasks.scala:165)
    at scala.collection.parallel.AdaptiveWorkStealingForkJoinTasks$WrappedTask.compute(Tasks.scala:514)
    at scala.concurrent.forkjoin.RecursiveAction.exec(RecursiveAction.java:160)
    at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
    at scala.concurrent.forkjoin.ForkJoinTask.doJoin(ForkJoinTask.java:341)
    at scala.concurrent.forkjoin.ForkJoinTask.join(ForkJoinTask.java:673)
    at scala.collection.parallel.ForkJoinTasks$WrappedTask$class.sync(Tasks.scala:444)
    at scala.collection.parallel.AdaptiveWorkStealingForkJoinTasks$WrappedTask.sync(Tasks.scala:514)
    at scala.collection.parallel.ForkJoinTasks$class.executeAndWaitResult(Tasks.scala:492)
    at scala.collection.parallel.ForkJoinTaskSupport.executeAndWaitResult(TaskSupport.scala:64)
    at scala.collection.parallel.ParIterableLike$ResultMapping.leaf(ParIterableLike.scala:961)
    at scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply$mcV$sp(Tasks.scala:54)
    at scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply(Tasks.scala:53)
    at scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply(Tasks.scala:53)
    at scala.collection.parallel.Task$class.tryLeaf(Tasks.scala:56)
    at scala.collection.parallel.ParIterableLike$ResultMapping.tryLeaf(ParIterableLike.scala:956)
    at scala.collection.parallel.AdaptiveWorkStealingTasks$WrappedTask$class.compute(Tasks.scala:165)
    at scala.collection.parallel.AdaptiveWorkStealingForkJoinTasks$WrappedTask.compute(Tasks.scala:514)
    at scala.concurrent.forkjoin.RecursiveAction.exec(RecursiveAction.java:160)
    at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
    at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
    at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
    at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
2015-02-17 19:14:24 DEBUG DiskBlockManager:63 - Shutdown hook called

At the beginning it looks fine (connection succeeds, keyspace is fetched...) but then, when it attempts to open thrift connection, it fails, disconnects and shut down.

I've opened ports 9160, 9042 and 7000.

And in cassandra.yaml I set

listen_address: <AWS_LOCAL_IP>
broadcast_address: <MY_PUBLIC_IP>

What am I missing?

1

1 Answers

3
votes

Ok, I was about to post this question but then I finally worked it out:

In cassandra.yaml, I had to set

rpc_adress = 0.0.0.0

Other stackoverflow questions helped me, but I'm posting this because the stack trace may help others to find it.