0
votes

i'm using spark 1.2 with spark cassandra connector 1.2.3, i'm trying to update somme rows of table:

example:

CREATE TABLE myTable ( 
a text, 
b text, 
c text, 
date timestamp, 
d text, 
e text static, 
f text static, 
PRIMARY KEY ((a, b, c), date, d) 
) WITH CLUSTERING ORDER BY (date ASC, d ASC)

val interactions = sc.cassandraTable[(String, String, String, DateTime, String, String)]("keySpace", "myTable"). 
select("a","b","c","date", "d", "e","f") 
val empty = interactions.filter(r => r._6 == null).cache() 
empty.count()

I just count the number of rows containing null for "e" and the remplace them by the value of "b"

 val update_inter = empty.map( r =>  (r._1,r._2, r._3, r._4, r._5, r._2)) 
 update_inter.saveToCassandra("keySpace", "myTable", SomeColumns("a","b","c","date", "d", "e", "f"))

this works when i check in cqlsh , but i still get the value null when i request the same rows by spark cassandra .

Is this a bug in spark cassandra connector? Thanks for your help.

2
Can you include how you are trying to retrieve those rows? - Gillespie
val newInteractions = sc.cassandraTable[(String, String, String, DateTime, String, String)]("keySpace", "myTable"). select("a","b","c","date", "d", "e","f") val newEmpty = interactions.filter(r => r._6 == null).cache() newEmpty .count() - Amine CHERIFI
No, that's how you are selecting the rows you are trying to update. I mean how are you trying to retrieve the rows once they have been transformed. Also, are you sure the other columns are not empty as well as r._6? - Gillespie
yes by the same method. i'm making a new select from cassandra table and count the number of rows with r._6 = null wich logicly must be 0 rows. About the other columns yes i'm sure that they are not empty - Amine CHERIFI
Ok, so that tells you whether or not you have successfully updated all rows where the 'e' column contained a null value. Is this count not returning 0 rows? - Gillespie

2 Answers

0
votes

As inserts/updates occur, rather than overwriting rows in place, Cassandra writes a new timestamped version of the inserted or updated data in another SSTable.

Your Spark job is either not updating the existing rows and instead is writing new rows or your SSTables have not written the changes to disk yet. If you were to write the results to a new table, the count of null 'e' columns would be zero.

Try the nodetool flush command and read this: Cassandra Compaction

0
votes

.mode('append') is used for appending I guess. I am facing a similar problem but using java connector but it seems in python this option is available