3
votes

We want to test that if a column has TTL (time-to-live) property it eventually will be removed from cassandra entirely along with the empty row which contained it.

As I understood, the algorithm for testing this behavious is

  • when saving an object, set TTL for a column
  • wait when TTL time passes, check that returned value is null
  • wait when GC_GRACE_SECONDS perion passes
  • check that the row also gets removed

And I failed to check the last item.

As I discovered (eg. here or here and in other places), I need to run compaction. Similar questions have been raised (eg. Hector (Cassandra) Delete Anomaly), but I didn't find anything that helped, and googling hasn't helped much.

So the question is, how I can force compaction from my integration test (using hector) to ensure that it behaves as expected? Or are there other ways to do this?

P.S. Truncating a column family is not an option.


Here are the details.

My tests:

private static final String KEYSPACE = "KEYSPACE";
private static final String COLUMN_FAMILY = "COLUMN_FAMILY";

private static final int GC_CRACE_SECONDS = 5;

// sut
private CassandraService cassandraService;

// dependencies
private Cluster cluster = HFactory.getOrCreateCluster("tstCltr", 
    "localhost:9160");

private Keyspace keyspace;

@BeforeClass
public static void setupBeforeClass() {
    EmbeddedCassandraDaemon.getEmbeddedCassandraDaemon();
}

@Before
public void setUp() throws Exception {
    keyspace = createKeyspace(KEYSPACE, cluster, 
        new QuorumAllConsistencyLevelPolicy());
    cassandraService = new CassandraService(cluster, KEYSPACE, 
        COLUMN_FAMILY, GC_CRACE_SECONDS);
}

@Test
public void rowGetsRemovedAfterGCGraceSeconds() throws Exception {
    Object obj = "OBJECT";
    String rowKey = "key";
    String columnName = "columnName";
    logger.info("before persisting rows count is {}" + countRows());

    cassandraService.persistObjectWithTtl(rowKey, columnName, obj, 5);

    logger.info("after persisting rows count is {}" + countRows());

    Object value = retrieve(rowKey, columnName);
    assertNotNull(value);

    logger.info("before TTL passes rows count is {}" + countRows());

    TimeUnit.SECONDS.sleep(6);

    Object nullValue = retrieve(rowKey, columnName);
    assertNull(nullValue);

    logger.info("after TTL passes rows count is {}" + countRows());

    TimeUnit.SECONDS.sleep(10);

    logger.info("wait 10 more seconds... rows count is {}" + countRows());
    System.out.println("================================" + countRows());

    TimeUnit.SECONDS.sleep(120);

    int countRows = countRows();
    logger.info("wait 2 more minutes... rows count is {}" + countRows);
    assertEquals(0, countRows);
}

Code for persisting:

public void persistObjectWithTtl(Object rowKey, Object columnName, 
        Object obj, int ttl) {
    LOGGER.debug("Persist {} / {}", rowKey, columnName);
    HColumn<Object, Object> column = createColumn(columnName, obj, 
            SERIALIZER, SERIALIZER);
    column.setTtl(ttl);
    executeInsertion(rowKey, column);
}

private void executeInsertion(Object rowKey, HColumn<Object, Object> column) {
    Mutator<Object> mutator = createMutator(keyspace, SERIALIZER);
    mutator.addInsertion(rowKey, this.columnFamilyName, column);
    mutator.execute();
}

Setting GcGraceSeconds for a column family:

private void addColumnFamily(String keySpaceName, String columnFamilyName, 
            int gcGraceSeconds) {
    ColumnFamilyDefinition columnFamilyDefinition = 
        createColumnFamilyDefinition(keySpaceName, columnFamilyName);

    ThriftCfDef columnFamilyWithGCGraceSeconds = 
        new ThriftCfDef(columnFamilyDefinition);
    columnFamilyWithGCGraceSeconds.setGcGraceSeconds(gcGraceSeconds);

    cluster.addColumnFamily(columnFamilyWithGCGraceSeconds);
}

And the code for counting rows, found on SO:

public int countRows() {
    int rowCount = 100;

    ObjectSerializer serializer = ObjectSerializer.get();
    RangeSlicesQuery<Object, Object, Object> rangeSlicesQuery =
            HFactory.createRangeSlicesQuery(keyspace, serializer, 
                serializer, serializer)
                    .setColumnFamily(COLUMN_FAMILY)
                    .setRange(null, null, false, 10)
                    .setRowCount(rowCount);

    Object lastKey = null;

    int i = 0;
    while (true) {
        rangeSlicesQuery.setKeys(lastKey, null);

        QueryResult<OrderedRows<Object, Object, Object>> result = 
            rangeSlicesQuery.execute();
        OrderedRows<Object, Object, Object> rows = result.get();
        Iterator<Row<Object, Object, Object>> rowsIterator = rows.iterator();

        if (lastKey != null && rowsIterator != null) {
            rowsIterator.next();
        }

        while (rowsIterator.hasNext()) {
            Row<Object, Object, Object> row = rowsIterator.next();
            lastKey = row.getKey();
            i++;

            if (row.getColumnSlice().getColumns().isEmpty()) {
                continue;
            }
        }

        if (rows.getCount() < rowCount) {
            break;
        }

    }

    return i;
}

Thanks.


Update:

The reason was that the amount of data was not enoght for compaction to run, so I needed to put more data, and flush tables more frequently to disk. So I ended up with the following test case:

@Test
public void rowGetsRemovedAfterGCGraceSeconds() throws Exception {
    final int expectedAmount = 50000;

    logger.info("before persisting rows count is {}", countRows());

    for (int i = 0; i < expectedAmount; i++) {
        String rowKey = RandomStringUtils.randomAlphanumeric(128);
        Object obj = RandomStringUtils.randomAlphanumeric(1000);
        cassandraService.persistObjectWithTtl(rowKey, COLUMN_NAME, obj, 20);

        if (i % 100 == 0) {
            StorageService.instance.forceTableFlush(KEYSPACE, COLUMN_FAMILY);
        }
    }

    logger.info("causing major compaction...");
    StorageService.instance.forceTableCompaction(KEYSPACE, COLUMN_FAMILY);
    logger.info("after major compaction rows count is {}", countRows());

    waitAtMost(Duration.TWO_MINUTES)
        .pollDelay(Duration.TWO_SECONDS)
        .pollInterval(Duration.ONE_HUNDRED_MILLISECONDS)
        .until(new Callable<Boolean>() {
            @Override
            public Boolean call() throws Exception {
                int countRows = countRows();
                logger.info("the rows count is {}", countRows);
                return countRows < expectedAmount;
            }
        });
}

full code : test class and sut

1

1 Answers

1
votes

Since you're working with Java, you can easily force a compaction through JMX by using the forceTableCompaction(keyspace, columnFamily) method of the org.apache.cassandra.db.StorageService MBean.