1
votes

I have problem for retrieving data from Cassandra table using sparkCassandraConnector. I created a name space called "ks" and table "student" in Cassandra. the table is like follow:

id | name

----+-----------

10 | Catherine

I started Spark locally with running start-all.sh

Then I created this the class "SparkCassandraConnector" which has a command for connecting spark and Cassandra.What I am trying to do is to retrieve data from student table and print it on screen.

The error I get is "java.lang.ClassNotFoundException: SparkCassandraConnector$Student java.net.URLClassLoader$1.run(URLClassLoader.java:372) java.net.URLClassLoader$1.run(URLClassLoader.java:361) java.security.AccessController.doPrivileged(Native Method) java.net.URLClassLoader.findClass(URLClassLoader.java:360) java.lang.ClassLoader.loadClass(ClassLoader.java:424) java.lang.ClassLoader.loadClass(ClassLoader.java:357) java.lang.Class.forName0(Native Method) java.lang.Class.forName(Class.java:340)

This is my Program:

import org.apache.commons.lang.StringUtils;

import org.apache.spark.SparkConf;

import org.apache.spark.api.java.JavaRDD;

import org.apache.spark.api.java.JavaSparkContext;

import java.io.Serializable;

import static com.datastax.spark.connector.CassandraJavaUtil.javaFunctions;

public class SparkCassandraConnector  implements Serializable {
public static void main(String[] args) {

    SparkConf conf = new SparkConf().setAppName("Simple Application");

    conf.setMaster("spark://127.0.0.1:7077");
    conf.set("spark.cassandra.connection.host", "127.0.0.1");
    String[] jars = new String[10];
    jars[0] = "~/.m2/repository/com/datastax/spark/spark-cassandra-connector-java_2.10/1.1.0-alpha4/spark-cassandra-connector-java_2.10-1.1.0-alpha4.jar";
    jars[1] = "~/.m2/repository/com/datastax/cassandra/cassandra-driver-core/2.1.0/cassandra-driver-core-2.1.0.jar";
    jars[3] = "~/.m2/repository/com/datastax/spark/spark-cassandra-connector_2.10/1.1.0-alpha4/spark-cassandra-connector_2.10-1.1.0-alpha4.jar";
    jars[4] = "~/.m2/repository/com/datastax/cassandra/cassandra-driver-core/2.1.0/cassandra-driver-core-2.1.0.jar";
    jars[5] = "~/.m2/repository/org/apache/cassandra/cassandra-thrift/2.1.0/cassandra-thrift-2.1.0.jar";
    jars[6] = "~/.m2/repository/org/apache/cassandra/cassandra-clientutil/2.1.0/cassandra-clientutil-2.1.0.jar";
    conf = conf.setJars(jars);
    JavaSparkContext sc = new JavaSparkContext(conf);

    JavaRDD<String> rdd = javaFunctions(sc).cassandraTable("ks", "student", Student.class)
            .map(new org.apache.spark.api.java.function.Function<Student, String>() {
                @Override
                public String call(Student person) throws Exception {
                    return person.toString();
                }
            });
    System.out.println("Data as Person beans: \n" + StringUtils.join(rdd.collect(), "\n"));
}
public static class Student implements  Serializable{

    private Integer id;
    private String name;

    public Student(){

    }
    public Student(Integer id, String name) {
        this.id = id;
        this.name = name;
    }

    public Integer getId() {
        return id;
    }

    public void setId(Integer id) {
        this.id = id;
    }

    public String getName() {
        return name;
    }

    public void setName(String name) {
        this.name = name;
    }
}

}

this is my POM file:

<dependencies>


    <!--Spark-->

    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-core_2.10</artifactId>
        <version>1.1.0</version>
    </dependency>
    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-streaming_2.10</artifactId>
        <version>1.1.0</version>
    </dependency>
    <dependency>
        <groupId>com.datastax.cassandra</groupId>
        <artifactId>cassandra-driver-core</artifactId>
        <version>2.1.0</version>
    </dependency>
    <dependency>
        <groupId>com.datastax.spark</groupId>
        <artifactId>spark-cassandra-connector_2.10</artifactId>
        <version>1.1.0-alpha4</version>
    </dependency>

    <dependency>
        <groupId>com.datastax.spark</groupId>
        <artifactId>spark-cassandra-connector-java_2.10</artifactId>
        <version>1.1.0-alpha4</version>
    </dependency>

    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-catalyst_2.10</artifactId>
        <version>1.0.0</version>
    </dependency>
</dependencies>
1
I suggest you to go forward with assembly plugin, you can read motivation for it and example with sbt here: eugenezhulenev.com/blog/2014/10/18/…Eugene Zhulenev
Thank you for the link. but do you agree that I provided necessary jar files and I shouldn't get that error.sia

1 Answers

3
votes

In the provided jars, the jar containing the job, and hence the Student.class is missing. A quick fix it to add the jar that's in your project's ./target folder.

An alternative is to package your job and all dependencies in an 'uber jar' and use that uber jar as only declared jar. Look into the maven shade plugin.

Jars can also be provided from the command line using spark-submit --jars option.