0
votes

I have a requirement of scanning a table which contains 100 million record in Production. The search will be made on the first clustering key. The requirement is to find the unique partition keys where first clustering key is matching a condition. The table looks like the following -

employeeid, companyname , lastdateloggedin, floorvisted, swipetimestamp

Partition Key - employeeid Clustering Key - companyname , lastdateloggedin

I would like to get select distinct(employeeid),company, swipetimestamp where companyname = 'XYZ'. This is an SQL representation of what i would like to fetch from the table.

 SparkConf conf = new SparkConf().set("spark.cassandra.connection.enabled", "true")
            .set("spark.cassandra.auth.username", "XXXXXXXXXX")
            .set("spark.cassandra.auth.password", "XXXXXXXXX")
            .set("spark.cassandra.connection.host", "hostname")
            .set("spark.cassandra.connection.port", "29042")
            .set("spark.cassandra.connection.factory", ConnectionFactory.class)
            .set("spark.cassandra.connection.cluster_name", "ZZZZ")
            .set("spark.cassandra.connection.application_name", "ABC")
            .set("spark.cassandra.connection.local_dc", "DC1")
            .set("spark.cassandra.connection.cachedClusterFile", "/tmp/xyz/test.json")
            .set("spark.cassandra.connection.ssl.enabled", "true")
            .set("spark.cassandra.input.fetch.size_in_rows","10000") //
            .set("spark.driver.allowMultipleContexts","true")
            .set("spark.cassandra.connection.ssl.trustStore.path", "sampleabc-spark-util/src/main/resources/x.jks")
            .set("spark.cassandra.connection.ssl.trustStore.password", "cassandrasam");

 CassandraJavaRDD<CassandraRow> ctable = javaFunctions(jsc).cassandraTable("keyspacename", "employeedetails").
            select("employeeid", "companyname","swipetimestamp").where("companyname= ?","XYZ");
 List<CassandraRow> cassandraRows = ctable.distinct().collect();

This code run in non production with close 5 million data. Since it is production i would like to approach this query with caution. Questions -

  1. What are the config that should be present in my SparkConf ?
  2. Will the spark job ever bring down the db because of the large table ?
  3. Running that job might starve threads to cassandra at that moment ?
1

1 Answers

0
votes

I would recommend to use Dataframe API instead of the RDDs - theoretically, SCC may do more optimizations for that API. If you have condition on the first clustering column, then this condition should be pushed by SCC down to Cassandra and filtering will happen there. You can check that by using .expalin on the dataframe, and checking that you have rules marked with * in the PushedFilters part.

Regarding config - use default version of the spark.cassandra.input.fetch.size_in_rows - if you have too high value, then you can have a higher chance of getting timeouts. You can still bring down the nodes with default value, as SCC is reading with LOCAL_ONE, and that overload single nodes. Sometimes, reading with LOCAL_QUORUM is faster because it won't overload individual nodes too much, and won't restart the tasks that are reading data.

And I recommend to make sure that you're using latest Spark Cassandra Connector - 2.5.0 - it has a lot of new optimizations and new functionality...