2
votes

I am fairly new to both Cassandra (2.1.11) and Spark (1.4.1) and am interested in knowing if anyone has seen/developed a solution for atomic writes to two different Cassandra tables using Spark Streaming.

I currently have two tables which hold the same data set, but have different partition keys. For simplicity sake, I'll use the familiar User table example to explain:

CREATE TABLE schema1.user_by_user_id
(
    user_id uuid
    ,email_address text
    ,num int //a value that is frequently updated
    ,PRIMARY KEY (user_id)
);

CREATE TABLE schema1.user_by_email_address
(
    email_address text
    ,user_id uuid
    ,num int //a value that is frequently updated
    ,PRIMARY KEY (email_address)
);

The email_address column will have a high cardinality (in reality it will be between 50% and 100% of the number of user_id values). High cardinality makes secondary indexes perform poorly, thus requiring the second table.

I am using Spark Streaming to process changes in the num column and update these two tables. As I understand it, the saveToCassandra() method executes writes for each item in the RDD in an UNLOGGED BATCH, thus performing atomic writes (as explained in the "Save a Collection of Objects" section here). However, saveToCassandra() can only be used to save to a single table. In order to keep both the schema1.user_by_user_id and schema1.user_by_email_address tables in sync, I have to issue two separate saveToCassandra() calls:

rdd.saveToCassandra("schema1","user_by_user_id",SomeColumns("user_id","email"address","num"))
rdd.saveToCassandra("schema1","user_by_email_address",SomeColumns("user_id","email"address","num"))

The writes ocurring within each call are done in an atomic manner, but the two calls together are not atomic. Some error in the second call will leave the two tables out of sync.

Obviously my data set and actual table structures are more complicated than this, but I've tried to convey the main points of my problem in as simple a way as possible. While my question is geared towards being able to save to two tables, I would welcome any alternative suggestions regarding data model changes which would completely remove this need.

1

1 Answers

1
votes

First thing to understand: UNLOGGED batches are not atomic. See documentation. The only thing UNLOGGED batches give you is the ability to make multiple writes all with the same timestamp.

So, if you want to make multiple calls to saveToCassandra and have them behave as if it were one call, just specify the WRITETIME to both calls. When everything is done, all of the modified data will have the same timestamp.

As for your question of how to make updates to multiple tables atomic...you can't. Cassandra doesn't support it.

The best suggestion I can think of, is to create your own sort of batch log that you can consult after a crash to figure out what needs to be re-synchronized.

Imagine something like this:

CREATE TABLE batch_log
(
    id uuid,
    updated_users set<uuid>,
    PRIMARY KEY(id)
)

When starting your job, generate a new uuid that will be the id of this job. Then, you'd issue 3 saves:

rdd.saveToCassandra("schema1", "batch_log", SomeColumns("batch_id", "user_id" append)
rdd.saveToCassandra("schema1","user_by_user_id",SomeColumns("user_id","email"address","num"))
rdd.saveToCassandra("schema1","user_by_email_address",SomeColumns("user_id","email"address","num"))

If your batch completes without any crashes, you can just delete the batch_log row that was created. However, if the system crashes partway through, then once things are back online, you can consult the batch_log to get a list of users which were updated. Go query those users for their email address and then update the user_by_email_address table. Once you finish this repair, you can delete your batch_log.

In effect you are implementing "by hand" a Cassandra LOGGED BATCH.