I'm trying to work with DynamoDb streams, I am using the example code shown in this article. I've modified it to work in a basic Spring Boot app (initializr), utilizing an existing DynamoDb table which has streams enabled. Everything appears to work, however; I'm not seeing any new updates.
This particular database has a bulk update once per day at a specific time, it may get some minor changes now and then during the day. I'm trying to monitor these minor updates. When I run the application I can see the records from the bulk update, however if my application is running and I use the AWS Console to modify, create or delete a record I don't seem to get any output.
I'm using:
- Spring Boot:2.3.9.RELEASE
- amazon-kinesis-client:1.14.2
- Java 11
- Running on Mac Catalina (though that shouldn't matter)
In my test application I did the following:
package com.test.dynamodb_streams_test_kcl.service;
import com.amazonaws.services.dynamodbv2.AmazonDynamoDB;
import com.amazonaws.services.dynamodbv2.AmazonDynamoDBStreams;
import com.amazonaws.services.dynamodbv2.model.*;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;
import javax.annotation.PostConstruct;
import java.time.ZoneId;
import java.time.ZonedDateTime;
import java.util.List;
@Slf4j
@Service
@RequiredArgsConstructor
public class LowLevelKclProcessor {
private static final String dynamoDbTableName = "global-items";
private final AmazonDynamoDB dynamoDB;
private final AmazonDynamoDBStreams dynamoDBStreams;
private final ZonedDateTime startTime = ZonedDateTime.now();
@PostConstruct
public void initialize() {
log.info("Describing table={}", dynamoDbTableName);
DescribeTableResult itemTableDescription = dynamoDB.describeTable(dynamoDbTableName);
log.info("Got description");
String itemTableStreamArn = itemTableDescription.getTable().getLatestStreamArn();
log.info("Got stream arn ({}) for table={} tableArn={}", itemTableStreamArn,
itemTableDescription.getTable().getTableName(), itemTableDescription.getTable().getTableArn());
// Get all the shard IDs from the stream. Note that DescribeStream returns
// the shard IDs one page at a time.
String lastEvaluatedShardId = null;
do {
DescribeStreamResult describeStreamResult = dynamoDBStreams.describeStream(
new DescribeStreamRequest()
.withStreamArn(itemTableStreamArn)
.withExclusiveStartShardId(lastEvaluatedShardId));
List<Shard> shards = describeStreamResult.getStreamDescription().getShards();
// Process each shard on this page
for (Shard shard : shards) {
String shardId = shard.getShardId();
System.out.println("Shard: " + shard);
// Get an iterator for the current shard
GetShardIteratorRequest getShardIteratorRequest = new GetShardIteratorRequest()
.withStreamArn(itemTableStreamArn)
.withShardId(shardId)
.withShardIteratorType(ShardIteratorType.LATEST);
GetShardIteratorResult getShardIteratorResult =
dynamoDBStreams.getShardIterator(getShardIteratorRequest);
String currentShardIter = getShardIteratorResult.getShardIterator();
// Shard iterator is not null until the Shard is sealed (marked as READ_ONLY).
// To prevent running the loop until the Shard is sealed, which will be on average
// 4 hours, we process only the items that were written into DynamoDB and then exit.
int processedRecordCount = 0;
while (currentShardIter != null && processedRecordCount < 100) {
System.out.println(" Shard iterator: " + currentShardIter.substring(380));
// Use the shard iterator to read the stream records
GetRecordsResult getRecordsResult = dynamoDBStreams.getRecords(new GetRecordsRequest()
.withShardIterator(currentShardIter));
List<Record> records = getRecordsResult.getRecords();
for (Record record : records) {
// I set a breakpoint on the line below, but it was never hit after the bulk update info
if (startTime.isBefore(ZonedDateTime.ofInstant(record.getDynamodb()
.getApproximateCreationDateTime().toInstant(), ZoneId.systemDefault()))) {
System.out.println(" " + record.getDynamodb());
}
}
processedRecordCount += records.size();
currentShardIter = getRecordsResult.getNextShardIterator();
}
}
// If LastEvaluatedShardId is set, then there is
// at least one more page of shard IDs to retrieve
lastEvaluatedShardId = describeStreamResult.getStreamDescription().getLastEvaluatedShardId();
} while (lastEvaluatedShardId != null);
}
}