We want to test that if a column has TTL (time-to-live) property it eventually will be removed from cassandra entirely along with the empty row which contained it.
As I understood, the algorithm for testing this behavious is
- when saving an object, set TTL for a column
- wait when TTL time passes, check that returned value is null
- wait when GC_GRACE_SECONDS perion passes
- check that the row also gets removed
And I failed to check the last item.
As I discovered (eg. here or here and in other places), I need to run compaction. Similar questions have been raised (eg. Hector (Cassandra) Delete Anomaly), but I didn't find anything that helped, and googling hasn't helped much.
So the question is, how I can force compaction from my integration test (using hector) to ensure that it behaves as expected? Or are there other ways to do this?
P.S. Truncating a column family is not an option.
Here are the details.
My tests:
private static final String KEYSPACE = "KEYSPACE";
private static final String COLUMN_FAMILY = "COLUMN_FAMILY";
private static final int GC_CRACE_SECONDS = 5;
// sut
private CassandraService cassandraService;
// dependencies
private Cluster cluster = HFactory.getOrCreateCluster("tstCltr",
"localhost:9160");
private Keyspace keyspace;
@BeforeClass
public static void setupBeforeClass() {
EmbeddedCassandraDaemon.getEmbeddedCassandraDaemon();
}
@Before
public void setUp() throws Exception {
keyspace = createKeyspace(KEYSPACE, cluster,
new QuorumAllConsistencyLevelPolicy());
cassandraService = new CassandraService(cluster, KEYSPACE,
COLUMN_FAMILY, GC_CRACE_SECONDS);
}
@Test
public void rowGetsRemovedAfterGCGraceSeconds() throws Exception {
Object obj = "OBJECT";
String rowKey = "key";
String columnName = "columnName";
logger.info("before persisting rows count is {}" + countRows());
cassandraService.persistObjectWithTtl(rowKey, columnName, obj, 5);
logger.info("after persisting rows count is {}" + countRows());
Object value = retrieve(rowKey, columnName);
assertNotNull(value);
logger.info("before TTL passes rows count is {}" + countRows());
TimeUnit.SECONDS.sleep(6);
Object nullValue = retrieve(rowKey, columnName);
assertNull(nullValue);
logger.info("after TTL passes rows count is {}" + countRows());
TimeUnit.SECONDS.sleep(10);
logger.info("wait 10 more seconds... rows count is {}" + countRows());
System.out.println("================================" + countRows());
TimeUnit.SECONDS.sleep(120);
int countRows = countRows();
logger.info("wait 2 more minutes... rows count is {}" + countRows);
assertEquals(0, countRows);
}
Code for persisting:
public void persistObjectWithTtl(Object rowKey, Object columnName,
Object obj, int ttl) {
LOGGER.debug("Persist {} / {}", rowKey, columnName);
HColumn<Object, Object> column = createColumn(columnName, obj,
SERIALIZER, SERIALIZER);
column.setTtl(ttl);
executeInsertion(rowKey, column);
}
private void executeInsertion(Object rowKey, HColumn<Object, Object> column) {
Mutator<Object> mutator = createMutator(keyspace, SERIALIZER);
mutator.addInsertion(rowKey, this.columnFamilyName, column);
mutator.execute();
}
Setting GcGraceSeconds for a column family:
private void addColumnFamily(String keySpaceName, String columnFamilyName,
int gcGraceSeconds) {
ColumnFamilyDefinition columnFamilyDefinition =
createColumnFamilyDefinition(keySpaceName, columnFamilyName);
ThriftCfDef columnFamilyWithGCGraceSeconds =
new ThriftCfDef(columnFamilyDefinition);
columnFamilyWithGCGraceSeconds.setGcGraceSeconds(gcGraceSeconds);
cluster.addColumnFamily(columnFamilyWithGCGraceSeconds);
}
And the code for counting rows, found on SO:
public int countRows() {
int rowCount = 100;
ObjectSerializer serializer = ObjectSerializer.get();
RangeSlicesQuery<Object, Object, Object> rangeSlicesQuery =
HFactory.createRangeSlicesQuery(keyspace, serializer,
serializer, serializer)
.setColumnFamily(COLUMN_FAMILY)
.setRange(null, null, false, 10)
.setRowCount(rowCount);
Object lastKey = null;
int i = 0;
while (true) {
rangeSlicesQuery.setKeys(lastKey, null);
QueryResult<OrderedRows<Object, Object, Object>> result =
rangeSlicesQuery.execute();
OrderedRows<Object, Object, Object> rows = result.get();
Iterator<Row<Object, Object, Object>> rowsIterator = rows.iterator();
if (lastKey != null && rowsIterator != null) {
rowsIterator.next();
}
while (rowsIterator.hasNext()) {
Row<Object, Object, Object> row = rowsIterator.next();
lastKey = row.getKey();
i++;
if (row.getColumnSlice().getColumns().isEmpty()) {
continue;
}
}
if (rows.getCount() < rowCount) {
break;
}
}
return i;
}
Thanks.
Update:
The reason was that the amount of data was not enoght for compaction to run, so I needed to put more data, and flush tables more frequently to disk. So I ended up with the following test case:
@Test
public void rowGetsRemovedAfterGCGraceSeconds() throws Exception {
final int expectedAmount = 50000;
logger.info("before persisting rows count is {}", countRows());
for (int i = 0; i < expectedAmount; i++) {
String rowKey = RandomStringUtils.randomAlphanumeric(128);
Object obj = RandomStringUtils.randomAlphanumeric(1000);
cassandraService.persistObjectWithTtl(rowKey, COLUMN_NAME, obj, 20);
if (i % 100 == 0) {
StorageService.instance.forceTableFlush(KEYSPACE, COLUMN_FAMILY);
}
}
logger.info("causing major compaction...");
StorageService.instance.forceTableCompaction(KEYSPACE, COLUMN_FAMILY);
logger.info("after major compaction rows count is {}", countRows());
waitAtMost(Duration.TWO_MINUTES)
.pollDelay(Duration.TWO_SECONDS)
.pollInterval(Duration.ONE_HUNDRED_MILLISECONDS)
.until(new Callable<Boolean>() {
@Override
public Boolean call() throws Exception {
int countRows = countRows();
logger.info("the rows count is {}", countRows);
return countRows < expectedAmount;
}
});
}
full code : test class and sut