1
votes

I'm trying to work with DynamoDb streams, I am using the example code shown in this article. I've modified it to work in a basic Spring Boot app (initializr), utilizing an existing DynamoDb table which has streams enabled. Everything appears to work, however; I'm not seeing any new updates.

This particular database has a bulk update once per day at a specific time, it may get some minor changes now and then during the day. I'm trying to monitor these minor updates. When I run the application I can see the records from the bulk update, however if my application is running and I use the AWS Console to modify, create or delete a record I don't seem to get any output.

I'm using:

  • Spring Boot:2.3.9.RELEASE
  • amazon-kinesis-client:1.14.2
  • Java 11
  • Running on Mac Catalina (though that shouldn't matter)

In my test application I did the following:

package com.test.dynamodb_streams_test_kcl.service;

import com.amazonaws.services.dynamodbv2.AmazonDynamoDB;
import com.amazonaws.services.dynamodbv2.AmazonDynamoDBStreams;
import com.amazonaws.services.dynamodbv2.model.*;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;

import javax.annotation.PostConstruct;
import java.time.ZoneId;
import java.time.ZonedDateTime;
import java.util.List;

@Slf4j
@Service
@RequiredArgsConstructor
public class LowLevelKclProcessor {
    private static final String dynamoDbTableName = "global-items";

    private final AmazonDynamoDB dynamoDB;
    private final AmazonDynamoDBStreams dynamoDBStreams;
    private final ZonedDateTime startTime = ZonedDateTime.now();

    @PostConstruct
    public void initialize() {
        log.info("Describing table={}", dynamoDbTableName);
        DescribeTableResult itemTableDescription = dynamoDB.describeTable(dynamoDbTableName);
        log.info("Got description");
        String itemTableStreamArn = itemTableDescription.getTable().getLatestStreamArn();
        log.info("Got stream arn ({}) for table={} tableArn={}", itemTableStreamArn,
                itemTableDescription.getTable().getTableName(), itemTableDescription.getTable().getTableArn());

        // Get all the shard IDs from the stream.  Note that DescribeStream returns
        // the shard IDs one page at a time.
        String lastEvaluatedShardId = null;

        do {
            DescribeStreamResult describeStreamResult = dynamoDBStreams.describeStream(
                    new DescribeStreamRequest()
                            .withStreamArn(itemTableStreamArn)
                            .withExclusiveStartShardId(lastEvaluatedShardId));
            List<Shard> shards = describeStreamResult.getStreamDescription().getShards();

            // Process each shard on this page

            for (Shard shard : shards) {

                String shardId = shard.getShardId();
                System.out.println("Shard: " + shard);

                // Get an iterator for the current shard

                GetShardIteratorRequest getShardIteratorRequest = new GetShardIteratorRequest()
                        .withStreamArn(itemTableStreamArn)
                        .withShardId(shardId)
                        .withShardIteratorType(ShardIteratorType.LATEST);
                GetShardIteratorResult getShardIteratorResult =
                        dynamoDBStreams.getShardIterator(getShardIteratorRequest);
                String currentShardIter = getShardIteratorResult.getShardIterator();

                // Shard iterator is not null until the Shard is sealed (marked as READ_ONLY).
                // To prevent running the loop until the Shard is sealed, which will be on average
                // 4 hours, we process only the items that were written into DynamoDB and then exit.
                int processedRecordCount = 0;
                while (currentShardIter != null && processedRecordCount < 100) {
                    System.out.println("    Shard iterator: " + currentShardIter.substring(380));

                    // Use the shard iterator to read the stream records

                    GetRecordsResult getRecordsResult = dynamoDBStreams.getRecords(new GetRecordsRequest()
                            .withShardIterator(currentShardIter));
                    List<Record> records = getRecordsResult.getRecords();
                    for (Record record : records) {
                        // I set a breakpoint on the line below, but it was never hit after the bulk update info
                        if (startTime.isBefore(ZonedDateTime.ofInstant(record.getDynamodb()
                                .getApproximateCreationDateTime().toInstant(), ZoneId.systemDefault()))) {
                            System.out.println("        " + record.getDynamodb());
                        }
                    }
                    processedRecordCount += records.size();
                    currentShardIter = getRecordsResult.getNextShardIterator();
                }
            }

            // If LastEvaluatedShardId is set, then there is
            // at least one more page of shard IDs to retrieve
            lastEvaluatedShardId = describeStreamResult.getStreamDescription().getLastEvaluatedShardId();

        } while (lastEvaluatedShardId != null);
    }
}
1

1 Answers

1
votes

Note that your test is based on the low-level API, not on the Kenisis client library. So it's normal to have some tricky technical details to deal with.


Your test application has some similarities with the example given in the doc, but it has issues:

When I run the application I can see the records from the bulk update

ShardIteratorType.LATEST will not look for old records that happened before running the test (It starts reading just after the most recent stream records in the shard)

So, I will assume that the iterator type was different (ex: TRIM_HORIZON) and changed later to LATEST during your tests.

The main issue comes from the fact that your application will sequentially poll shards, and it will bloque in the first shard until it finds 100 new records in this shard (due to LATEST iterator type).

So, you may not see the new minor changes while the test is running if they belong to a different shard.

Solutions:

1- Poll shards in parallel using threads.

2- Filter returned shards using the sequence number of the last logged record, and try to guess the shard that may contain minor changes.

3- Dangerous & I'm not sure if it works :) In a test table, and if your data model allows this: close the current stream, and enable a new one, then make sure that all your writes belong to one partition. In the majority of cases, table partitions have a one-to-one relationship with active shards. Theoretically, you have only one active shard to deal with.