12
votes

I'm attempting to capture DynamoDB table changes using DynamoDB streams and the AWS provided Java DynamoDB streams Kinesis adapter. I'm working with the AWS Java SDKs in a Scala app.

I started by following the AWS guide and by going through the AWS published code example. However I'm having issues getting Amazon's own published code working in my environment. My issue lies with the KinesisClientLibConfiguration object.

In the example code, KinesisClientLibConfiguration is configured with the stream ARN provided by DynamoDB.

new KinesisClientLibConfiguration("streams-adapter-demo",
    streamArn, 
    streamsCredentials, 
    "streams-demo-worker")

I followed a similar pattern in my Scala app by first locating the current ARN from my Dynamo table:

lazy val streamArn = dynamoClient.describeTable(config.tableName)
.getTable.getLatestStreamArn

And then creating the KinesisClientLibConfiguration with the provided ARN:

lazy val kinesisConfig :KinesisClientLibConfiguration =
new KinesisClientLibConfiguration(
  "testProcess",
  streamArn,
  defaultProviderChain,
  "testWorker"
).withMaxRecords(1000)
   .withRegionName("eu-west-1")
   .withMetricsLevel(MetricsLevel.NONE)
  .withIdleTimeBetweenReadsInMillis(500)
  .withInitialPositionInStream(InitialPositionInStream.TRIM_HORIZON)

I've verified the provided stream ARN and everything matches what I see in the AWS console.

At runtime I end up getting an exception stating that the provided ARN is not a valid stream name:

com.amazonaws.services.kinesis.clientlibrary.lib.worker.ShardSyncTask call
SEVERE: Caught exception while sync'ing Kinesis shards and leases
com.amazonaws.services.kinesis.model.AmazonKinesisException: 1 validation     
error detected: Value 'arn:aws:dynamodb:eu-west-1:STREAM ARN' at 
'streamName'    failed to satisfy constraint: Member must satisfy regular 
expression pattern: [a-zA-Z0-9_.-]+ (Service: AmazonKinesis; Status Code: 
400; Error Code: ValidationException; Request ID: )

Looking at the documentation provided on KinesisClientLibConfiguration this does make sense as the second parameter is listed as the streamName without any mention of an ARN.

I can't seem to find anything on KinesisClientLibConfiguration that is related to an ARN. Since I'm working with a DynamoDB stream and not a Kinesis stream I'm also unsure how to find my stream name.

At this point I'm unsure what I'm missing from the published AWS example, it seems like they may be using a much older version of the KCL. I'm using version 1.7.0 of amazon-kinesis-client.

4

4 Answers

4
votes

The issue actually ended up being outside of my KinesisClientLibConfiguration.

I was able to get around this issue by using the same configuration and by providing both the stream adapter included with the DynamoDB stream adapter library and clients for both DynamoDB and CloudWatch.

My working solution now looks like this.

Defining the Kinesis client config.

//Kinesis config for DynamoDB streams
lazy val kinesisConfig :KinesisClientLibConfiguration =
    new KinesisClientLibConfiguration(
        getClass.getName, //DynamoDB shard lease table name
        streamArn, //pulled from the dynamo table at runtime
        dynamoCredentials, //DefaultAWSCredentialsProviderChain 
        KeywordTrackingActor.NAME //Lease owner name
    ).withMaxRecords(1000) //using AWS recommended value
     .withIdleTimeBetweenReadsInMillis(500) //using AWS recommended value
    .withInitialPositionInStream(InitialPositionInStream.TRIM_HORIZON)

Define a stream adapter and a CloudWatch client

val streamAdapterClient :AmazonDynamoDBStreamsAdapterClient = new AmazonDynamoDBStreamsAdapterClient(dynamoCredentials)
streamAdapterClient.setRegion(region)

val cloudWatchClient :AmazonCloudWatchClient = new AmazonCloudWatchClient(dynamoCredentials)
cloudWatchClient.setRegion(region)

Create an instance of a RecordProcessorFactory, it's up to you to define a class that implements the KCL provided IRecordProcessorFactory and the returned IRecordProcessor.

val recordProcessorFactory :RecordProcessorFactory = new RecordProcessorFactory(context, keywordActor, config.keywordColumnName)

And the part I was missing, all of this needs to be provided to your worker.

val worker :Worker =
  new Worker.Builder()
    .recordProcessorFactory(recordProcessorFactory)
    .config(kinesisConfig)
    .kinesisClient(streamAdapterClient)
    .dynamoDBClient(dynamoClient)
    .cloudWatchClient(cloudWatchClient)
    .build()

//this will start record processing
streamExecutorService.submit(worker)
0
votes

Alternatively, you can use the com.amazonaws.services.dynamodbv2.streamsadapter.StreamsWorker instead of com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker which internally uses the AmazonDynamoDBStreamsAdapterClient.

i.e.

lazy val kinesisConfig :KinesisClientLibConfiguration =
new KinesisClientLibConfiguration(
    getClass.getName, //DynamoDB shard lease table name
    streamArn, //pulled from the dynamo table at runtime
    dynamoCredentials, //DefaultAWSCredentialsProviderChain 
    KeywordTrackingActor.NAME //Lease owner name
).withMaxRecords(1000) //using AWS recommended value
 .withIdleTimeBetweenReadsInMillis(500) //using AWS recommended value
.withInitialPositionInStream(InitialPositionInStream.TRIM_HORIZON)

val worker = new com.amazonaws.services.dynamodbv2.streamsadapter.StreamsWorker(recordProcessorFactory, kinesisConfig)
0
votes

Just to answer what the problem was - you were providing the ARN when it just wanted the stream name.

0
votes

I did a PR recently to this project gfc-aws-kinesis and you can now use it by just passing the adapter and writing a KinesisRecordAdapter implementation.

In the example I'm using Scanamo to parse the hashmap

Create the client

val streamAdapterClient: AmazonDynamoDBStreamsAdapterClient =
    new AmazonDynamoDBStreamsAdapterClient()

Pass it in the configuration:

val streamConfig = KinesisStreamConsumerConfig[Option[A]](
  applicationName,
  config.stream, //the full dynamodb stream arn
  regionName = Some(config.region),
  checkPointInterval = config.checkpointInterval,
  initialPositionInStream = config.streamPosition,
  dynamoDBKinesisAdapterClient = Some(streamAdapterClient)
)
KinesisStreamSource(streamConfig).mapMaterializedValue(_ => NotUsed)

Create an implicit record reader suitable for reading dynamodb events:

implicit val kinesisRecordReader
  : KinesisRecordReader[Option[A]] =
  new KinesisRecordReader[Option[A]] {
    override def apply(record: Record): Option[A] = {
      record match {
        case recordAdapter: RecordAdapter =>
          val dynamoRecord: DynamoRecord =
            recordAdapter.getInternalObject
          dynamoRecord.getEventName match {
            case "INSERT" =>
              ScanamoFree
                .read[A](
                  dynamoRecord.getDynamodb.getNewImage)
                .toOption
            case _ => None
          }
        case _ => None
      }
    }
  }