0
votes

I have a rather peculiar problem. In a DSE spark analytics engine I produce frequent stats that I store to cassandra in a small table. Since I keep the table trimmed and it is supposed to serve a web interface with consolidated information, I simply want to query the whole table in spark and send the results over an API. I have tried two methods for this:

  1. val a = Try(sc.cassandraTable[Data](keyspace, table).collect()).toOption

  2. val query = "SELECT * FROM keyspace.table"
    val df = spark.sqlContext.sql(query) val list = df.collect()

I am doing this in a scala program. When I use method 1, spark job mysteriously gets stuck showing stage 10 of 12 forever. Verified in logs and spark jobs page. When I use the second method it simply tells me that no such table exists:

Unknown exception: org.apache.spark.sql.AnalysisException: Table or view not found: keyspace1.table1; line 1 pos 15; 'Project [*] +- 'UnresolvedRelation keyspace1.table1

Interestingly, I tested both methods in spark shell on the cluster and they work just fine. My program has plenty of other queries done using method 1 and they all work fine, the key difference being that in each of them the main partition key always has a condition on it unlike in this query (holds true for this particular table too).

Here is the table structure:

CREATE TABLE keyspace1.table1 (
    userid text,
    stat_type text,
    event_time bigint,
    stat_value double,
    PRIMARY KEY (userid, stat_type)) 
WITH CLUSTERING ORDER BY (stat_type ASC)

Any solid diagnosis of the problem or a work around would be much appreciated

1

1 Answers

0
votes

When you do select * without where clause in cassandra, you're actually performing a full range query. This is not intended use case in cassandra (aside from peeking at the data perhaps). Just for the fun of it, try replacing with select * from keyspace.table limit 10 and see if it works, it might...

Anyway, my gut feeling says you're problem isn't with spark, but with cassandra. If you have visibility for cassandra metrics, look for the range query latencies.

Now, if your code above is complete - the reason that method 1 freezes, while method 2 doesn't, is that method 1 contains an action (collect), while method 2 doesn't involve any spark action, just schema inference. Should you add to method 2 df.collect you will face the same issue with cassandra