2
votes

I use Datastax Enterprise 4.5. I hope I did the config right, I did it like on datastax website explained. I can write into the Cassandra DB with an Windowsservice, this works but i can't query with Spark using the where function.

I start the Cassandra node (there is only one for test purpose) with "./dse cassandra -k -t" (in the /bin folder) so hadoop and spark are running both. I can write into Cassandra without a problem.

So you cannot use a 'where' clause in a Cassandra query when the 'where' isn't the RowKey. So I need to use Spark/Shark. I can start and use all queries I need with shark (./dse shark) but I need to write a Standalone program in Scala or Java.

So I tried this link: https://github.com/datastax/spark-cassandra-connector

And I can query a simple statement like:

val conf = new SparkConf(true)
  .set("spark.cassandra.connection.host", "MY_IP")
  .setMaster("spark://MY_IP:7077")
  .setAppName("SparkTest")

// Connect to the Spark cluster:
lazy val sc = new SparkContext(conf)

val rdd = sc.cassandraTable("keyspace", "tablename")
println(rdd.first)

and this works well but if I ask for more line or count:

println(rdd.count)
rdd.toArray.foreach(println)

then I get this exception:

Exception in thread "main" org.apache.spark.SparkException: Job aborted due to stage failure: All masters are unresponsive! Giving up.
at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1049)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1033)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1031)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1031)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:635)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:635)
at scala.Option.foreach(Option.scala:236)
at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:635)
at org.apache.spark.scheduler.DAGSchedulerEventProcessActor$$anonfun$receive$2.applyOrElse(DAGScheduler.scala:1234)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498)
at akka.actor.ActorCell.invoke(ActorCell.scala:456)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237)
at akka.dispatch.Mailbox.run(Mailbox.scala:219)
at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

When I try this in Java I have the same problem. Does anyone know this problem? I dont know if the DB config is correct or if the scala/Javaprogram works correct. Maybe some Ports a blocked but 7077 and 4040 are open.

Sidenote: If I start spark on the Cassandra DB, I can do queries like:

sc.cassandraTable("test","words").select("word").toArray.foreach(println) 

But if I use a "where" clause like:

sc.cassandraTable("test","words").select("word").where("word = ?","foo").toArray.foreach(println)

I get this exception:

java.io.IOException: Exception during query execution: SELECT "word" FROM "test"."words" WHERE token("word") > 0 AND word = ? ALLOW FILTERING

Do you have an Idea why? I thought I can use where clauses in spark?

Thank you!

2

2 Answers

2
votes
All masters are unresponsive!

Implies that the IP you are attempting to connect to is not actually bound by spark. So this is basically a networking configuration error. Scan to see which interfaces are listening on 7077 and make sure you are connecting to the correct interface.

As for the second question, the where operator implies you are going to do a predicate pushdown on that clause. Currently you cannot do this with primary keys. If you want to where on a single primary key you can do a filter to accomplish that but you will not see great performance as this will do a whole table scan.

0
votes

So far this is my solution. It is not the answer to all of my questions but it works for me and I wanna share it to you.

I use the hive jdbc driver to access a SharkServer with Java. How It works:

Start sharkserver: bin/dse shark --service sharkserver -p <port>

Dependencies for Maven:

<dependency>
    <groupId>org.apache.hive</groupId>
    <artifactId>hive-jdbc</artifactId>
    <version>0.13.1</version>
</dependency>
<dependency>
    <groupId>org.apache.hadoop</groupId>
    <artifactId>hadoop-core</artifactId>
    <version>0.20.2</version>
</dependency>

Java Code:

import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;

public class HiveJdbcClient {
  private static String driverName = "org.apache.hadoop.hive.jdbc.HiveDriver";

  public static void main(String[] args) throws SQLException {
    try {
      Class.forName(driverName);
    } catch (ClassNotFoundException e) {
      e.printStackTrace();
      System.exit(1);
    }
    Connection con = DriverManager.getConnection("jdbc:hive://YOUR_IP:YOUR_PORT/default", "", "");
    Statement stmt = con.createStatement();
    String sql;
    ResultSet res;



    sql = "SELECT * FROM keyspace.colFam WHERE name = 'John'";
    res = stmt.executeQuery(sql);
    while (res.next()) {
        System.out.println(res.getString("name"));
   }
 }
}