2
votes

I have been developing a project utilizing the 1.0.0 Release of cloud stream kinesis binder (https://github.com/spring-cloud/spring-cloud-stream-binder-aws-kinesis). I have an inbound channel (kinesis stream 1) that is being consumed by my application. my application is essentially reading modifying and writing a new message to an outbound channel (kinesis stream 2).

I am able to read messages in batch 100,200 records at a time. But when it comes to writing to the outbound stream I don't see a way to configure the outbound channel to perform batch writing equivalent to kinesis stream API's PutRecordsRequest.

Has anyone been able to perform something like this with cloud stream output channels?

Thanks.

1

1 Answers

1
votes

You can just return a PutRecordsRequest from your @StreamListener and configure the .producer.useNativeEncoding = true for the output binding target. This way the result of the @StreamListener is not going to be converted to the byte[] and that will become a KinesisMessageHandler responsibility to handle that PutRecordsRequest payload properly. And that is possible there now:

if (message.getPayload() instanceof PutRecordsRequest) {
        AsyncHandler<PutRecordsRequest, PutRecordsResult> asyncHandler =
                obtainAsyncHandler(message, (PutRecordsRequest) message.getPayload());

        return this.amazonKinesis.putRecordsAsync((PutRecordsRequest) message.getPayload(), asyncHandler);
    }