0
votes

I have a cassandra cluster with two nodes .. I have setup the spark job to query from this cassandra cluster which has 3651568 number of keys.

import com.datastax.spark.connector.rdd.ReadConf
import org.apache.spark.sql.cassandra
import org.apache.spark.sql.SparkSession

val conf = new SparkConf(true).set("spark.cassandra.connection.host", "hostname)
val sc = new SparkContext(conf)

val spark = SparkSession.builder().master("local").appName("Spark_Cassandra").config("spark.cassandra.connection.host", "hostname").getOrCreate()
val studentsDF = spark.read.cassandraFormat("keyspacename", "tablename").options(ReadConf.SplitSizeInMBParam.option(32)).load()
studentsDF.show(1000)

I am able to query for first 1000 rows but i am unable to find a way to read from 1001th row to 2000th row so that i can read data batch wise from Cassandra table using spark job.

as per the recommendation i started using java driver

here is the complete explanation

I have to query from cassandra database using datastax java driver.. I am using datastax java driver version cassandra-java-driver-3.5.1 and apache-cassandra version apache-cassandra-3.0.9 and i have tried solving the dependencies by installing jars I have also checked the yaml file seeds , listen_address, rpc_address are all pointing to my host and start_native_transport is set to true Here is my java code to establish connection to the cassandra database `

import java.net.InetAddress;
  import com.datastax.driver.core.Metadata;
  import java.net.UnknownHostException;
  import com.datastax.driver.core.Cluster;
  import com.datastax.driver.core.Cluster.Builder;
  import com.datastax.driver.core.Session;
  import com.datastax.driver.core.ResultSet;
  import com.datastax.driver.core.Row;
public class Started {
    public void connect()
    {
     try
       {
         Cluster cluster;
         Session session;
         cluster = Cluster.builder().addContactPoints("***.***.*.*").build();
       cluster.getConfiguration().getSocketOptions().setReadTimeoutMillis(2000);
         System.out.println("Connected to cluster:");
         session= cluster.connect("demo");
         Row row = session.execute("SELECT ename FROM demo.emp").one();
         System.out.println(row.getString("ename"));
         cluster.close();
        }
          catch (Exception e) {
              e.printStackTrace();
              }
           }
    public static void main(String[] args)
     {
       Started st = new Started();
       st.connect();
       }
          }

`

I have only one node in the cassandra cluster and it is up and running . i am able to cqlsh to it also on 9042 port .. so far so good but when i am running my java program i am getting this error or exception message ...

Connected to cluster:
`

com.datastax.driver.core.exceptions.NoHostAvailableException: All host(s) tried for query failed (tried: /***.***.*.*:9042 (com.datastax.driver.core.exceptions.TransportException: [/***.***.*.*:9042] Cannot connect))
            at com.datastax.driver.core.ControlConnection.reconnectInternal(ControlConnection.java:232)
            at com.datastax.driver.core.ControlConnection.connect(ControlConnection.java:79)
            at com.datastax.driver.core.Cluster$Manager.negotiateProtocolVersionAndConnect(Cluster.java:1631)
            at com.datastax.driver.core.Cluster$Manager.init(Cluster.java:1549)
            at com.datastax.driver.core.Cluster.init(Cluster.java:160)
            at com.datastax.driver.core.Cluster.connectAsync(Cluster.java:342)
            at com.datastax.driver.core.Cluster.connect(Cluster.java:292)
            at Started.connect(Started.java:22)
            at Started.main(Started.java:34)

`

can anyone please help !!

2

2 Answers

1
votes

This may be a bad fit for Spark. Show for example is just showing 1000 records, but the order of the records is not guaranteed. Multiple invocations could produce different results.

Your best bet within Spark is probably to get the results as a local iterator if you want to page through them but again this is probably not the best way to do things. Spark is a system for working on data on a remote cluster. This would mean doing your processing within the dataframe api.

If you really just want to slowly page through records you can use toLocalIterator to grab batches back to your driver machine (not recommended). But you could accomplish something similiar by just doing a Select (*) using Java Driver. The result set iterator which is returned to you will page through results automatically as you progress through the results.

Example of using the Java Driver's Paging

https://docs.datastax.com/en/developer/java-driver/3.2/manual/paging/

ResultSet rs = session.execute("your query");
  for (Row row : rs) {
  // Process the row ...
  // By default this will only pull a new "page" of data from cassandra
  // when the previous page has been fully iterated through. See the
  // docs for more details    
}

Example processing the data Remotely With Spark

RDD Docs for Cassandra Dataframe Docs for Cassandra //RDD API sparkContext.cassandraTable("ks","tab").foreach(row => //processRow)

//Dataframe API - although similar foreach is available here as well
spark.read.format("org.apache.spark.sql.cassandra")
  .load()
  .select(//do some transforms)
  .write(//pickoutput of request)

Example of using to localIterator, probably the least relevant method

Why you might want to do this with an example

// This reads all data in large blocks to executors, those blocks are then pulled one at a time back to the Spark Driver.
sparkContext.cassandraTable("ks","tab").toLocalIterator
0
votes

That's the issue of the driver compatibility . Initially i was using cassandra-java-driver-3.5.1 and apache-cassandra-3.0.9 . switch to cassandra-java-driver-3.0.8 and apache-cassandra-3.0.9 and also install few jar files : slf4j-log4j12-1.7.7.jar, log4j-1.2.17.jar, netty-all-4.0.39.Final.jar .. works for me fine :)