4
votes

I am using spark-cassandra-connector 1.1.0 provided by datastax. I noticed intereting problem and I am not sure why something like this is happening: When I broadcast cassandra connector and try to use it on executors I am receving exception suggesting that my configuration is invalid cannot connect to Cassandra at 0.0.0.

Examplary stacktrace:

java.io.IOException: Failed to open native connection to Cassandra at {0.0.0.0}:9042
        at com.datastax.spark.connector.cql.CassandraConnector$.com$datastax$spark$connector$cql$CassandraConnector$$createSession(CassandraConnector.scala:174)
        at com.datastax.spark.connector.cql.CassandraConnector$$anonfun$2.apply(CassandraConnector.scala:160)
        at com.datastax.spark.connector.cql.CassandraConnector$$anonfun$2.apply(CassandraConnector.scala:160)
        at com.datastax.spark.connector.cql.RefCountedCache.createNewValueAndKeys(RefCountedCache.scala:36)
        at com.datastax.spark.connector.cql.RefCountedCache.acquire(RefCountedCache.scala:61)
        at com.datastax.spark.connector.cql.CassandraConnector.openSession(CassandraConnector.scala:71)
        at com.datastax.spark.connector.cql.CassandraConnector.withSessionDo(CassandraConnector.scala:97)
...

But If I use it without broadcasting everything is working fine.

What is also strange for me, on the driver side broadcasted value print proper configuration but on executor side not.

Driver side:

  val dbConf = ssc.sparkContext.getConf
  val connector = CassandraConnector(dbConf)
  println(connector.hosts) //Set(10.20.1.5) 
  val broadcastedConnector = ssc.sparkContext.broadcast(connector)
  println(broadcastedConnector.value.hosts) //Set(10.20.1.5) 

Executor side:

mapPartition{
...
 println(broadcastedConnector.hosts) // Set(0.0.0.)
...
}

Could someone explain why it is working in such a way and how to broadcast Cassandra connector in a way that could be used on executors side.

Update The same problem is in 1.2.3 version of connector.

1

1 Answers

3
votes

There is no reason to broadcast the Cassandra Connector. Using it within a parallelized closure will only serialize the configuration and create a new connection on the executor or use an existing executor connection if it exists.