I'm attempting to load a large amount of data into a 10-node Cassandra ring.
The script doing the inserts gets ~4000 inserts / s, blocked presumably on network I/O. I launch 8 of these on a single machine, and the throughput scales almost linearly. (The individual throughput goes down slightly, but is more than compensated for by the additional processes.)
This works decently, however, I'm still not getting enough throughput, so I launched the same setup on 3 more VMs. (Thus, 8 processes * 4 VM) After the first additional VM, and with increasing frequency and severity as further VM are added, the following occurs:
- The clients start receiving Timeout errors. They can re-try their writes, but because they do so in batches, their forward progress is almost entirely eliminated.
- The ring becomes unstable, and nodes start labelling themselves as "down". Further, different nodes tend to have different ideas of who is down. The ring doesn't recover when the scripts are aborted. (I've not even been able to fix this by just restarting individual nodes: I've had to restart the entire ring.)
"Down" varies. In my last run:
- 4 nodes died completely. (Cassandra wasn't running at all.) Checking the logs, there didn't appear to be anything logged as to why it died.
- On the fifth, Cassandra was running.
nodetool statuson that node hangs. Two threads appears to be in infinite loops of some sort. (They're using 100% CPU solidly.) There is ajava.lang.OutOfMemoryError: Java heap spacein the logs.
The code is essentially:
def prepped_batch_insert(session, items, insert_query, silent=False):
# A mapping of number of inserts -> a prepared query for that number of
# inserts.
prepped_statements = {}
def get_prepped_statement(inserts):
if inserts in prepped:
# We already created a prepared query for this many inserts, use
# it:
return prepped_statements[inserts]
else:
# We haven't yet created a prepared query for this many inserts, so
# do so now:
query = ['BEGIN UNLOGGED BATCH']
for idx in xrange(inserts):
query.append(insert_query.query)
query.append('APPLY BATCH;')
query = '\n'.join(query)
ps = session.prepare(query)
prepped_statements[inserts] = ps
return ps
def do_prepped_batch_insert(batch)
ps = get_prepped_statement(len(batch))
# Generate the list of params to the prepared query:
params = []
for idx, item in enumerate(batch):
for k in insert_query.keyorder:
params.append(item[k])
# Do it.
session.execute(ps, params)
return inserter.insert_and_time(
items, # data generator
do_prepped_batch_insert, # The above function
_WHAT_APPEARS_TO_BE_THE_OPTIMAL_CASSANDRA_BATCH_SIZE, # = 200
silent=silent,
)
The function insert_and_time splits items up into batches of size 200, calls the above function, and times the whole kit and kaboodle. (This code is toxic to the ring.)
We attempted more reads because (I was told) 20k inserts / second was slow (it will take a while to insert the data I'd like to insert at that rate…), and that Cassandra was capable of high capacity.
My questions:
- Is there anything unusual about what I'm doing? Anything wrong?
- Am I simply DDoS-ing my ring?
- How can I go about debugging what's wrong?
- An errant client, IMHO, should never be able to kill the server. (And the above isn't terribly errant.) Anything I can do to prevent this?
¹The client appears to also slowly leak file descriptors. I don't think this is related. (I'm calling .shutdown on both the cluster and the connection.) Looking at the driver source, there appear to be plenty of pathways where an exception would cause a leak.