0
votes

I have written a code to get records from kinesis stream to a lambda function which gives an output payload of Data, partition ID and sequence number, then I try to invoke a second lambda to get the sequence number and partition ID from first Lambda, then the second lambda pulls the data from the kinesis stream. I'm stuck with getting data from kinesis stream using sequence number and partition ID.

Below is the code for invoking one lambda to another.

public class LambdaFunctionHandler implements RequestHandler<KinesisEvent, Object> {
        private static final String regionName = "us-east-1";
        private static final String functionName = "Test1";


    @Override
    public Object handleRequest(KinesisEvent input, Context context) {
        context.getLogger().log("Input: " + input);
        List<KinesisEventRecord> records = input.getRecords();
        for (KinesisEventRecord rec : records){
            ByteBuffer recdata = rec.getKinesis().getData();
             String data = new String( recdata.array(), Charset.forName("UTF-8") );
            context.getLogger().log("Data: " +data);
            context.getLogger().log("Partition key: " +rec.getKinesis().getPartitionKey());
            context.getLogger().log("Sequence Number: " +rec.getKinesis().getSequenceNumber());
        }

        //call another lambda function
        try {
            AWSLambdaClient lambda = new AWSLambdaClient();
            Region region = Region.getRegion(Regions.fromName(regionName));
            lambda.setRegion(region);
            InvokeRequest invokeRequest = new InvokeRequest();
            invokeRequest.setFunctionName(functionName);
            invokeRequest.setPayload("\" AWS Lambda Test - internal call\"");
            System.out.println(
                    lambda.invoke(invokeRequest).getPayload());
        } catch (Exception e) {
            System.out.println(e.getMessage());

        }

        // TODO: implement your handler
        return null;
    }

}

here is the code which i have tried for get records using sequence number and partition ID.

public class LambdaFunctionHandler implements RequestHandler<KinesisEvent, Object> {
private static final String streamName = "Test";
private static final String partitionKey = "123456676454";
private static final String sequenceNumber = "12345"

public Object handleRequest(KinesisEvent input, Context context) {
    context.getLogger().log("Input: " + input);
    GetRecordsRequest getRecordsRequest = new GetRecordsRequest();
    getRecordsRequest.setStreamName(streamName);
    getRecordsRequest.setPartitionKey(partitionKey);
    getRecordsRequest.setsequenceNumber(sequenceNumber);
    KinesisEvent.getRecord(getRecord);

}
        }

please let me know a way to get records from the kinesis stream using sequence number and partition ID.

1

1 Answers

1
votes

You cannot decide the stream you want to read from nor the position while inside the Lambda function. Before the Lambda function are running, you need to create mapping between your kinesis stream and this lambda function. This will trigger the lambda every time new records will arrive.

Basicly a Lambda function cannot decide its own input.