I have written a code to get records from kinesis stream to a lambda function which gives an output payload of Data, partition ID and sequence number, then I try to invoke a second lambda to get the sequence number and partition ID from first Lambda, then the second lambda pulls the data from the kinesis stream. I'm stuck with getting data from kinesis stream using sequence number and partition ID.
Below is the code for invoking one lambda to another.
public class LambdaFunctionHandler implements RequestHandler<KinesisEvent, Object> {
private static final String regionName = "us-east-1";
private static final String functionName = "Test1";
@Override
public Object handleRequest(KinesisEvent input, Context context) {
context.getLogger().log("Input: " + input);
List<KinesisEventRecord> records = input.getRecords();
for (KinesisEventRecord rec : records){
ByteBuffer recdata = rec.getKinesis().getData();
String data = new String( recdata.array(), Charset.forName("UTF-8") );
context.getLogger().log("Data: " +data);
context.getLogger().log("Partition key: " +rec.getKinesis().getPartitionKey());
context.getLogger().log("Sequence Number: " +rec.getKinesis().getSequenceNumber());
}
//call another lambda function
try {
AWSLambdaClient lambda = new AWSLambdaClient();
Region region = Region.getRegion(Regions.fromName(regionName));
lambda.setRegion(region);
InvokeRequest invokeRequest = new InvokeRequest();
invokeRequest.setFunctionName(functionName);
invokeRequest.setPayload("\" AWS Lambda Test - internal call\"");
System.out.println(
lambda.invoke(invokeRequest).getPayload());
} catch (Exception e) {
System.out.println(e.getMessage());
}
// TODO: implement your handler
return null;
}
}
here is the code which i have tried for get records using sequence number and partition ID.
public class LambdaFunctionHandler implements RequestHandler<KinesisEvent, Object> {
private static final String streamName = "Test";
private static final String partitionKey = "123456676454";
private static final String sequenceNumber = "12345"
public Object handleRequest(KinesisEvent input, Context context) {
context.getLogger().log("Input: " + input);
GetRecordsRequest getRecordsRequest = new GetRecordsRequest();
getRecordsRequest.setStreamName(streamName);
getRecordsRequest.setPartitionKey(partitionKey);
getRecordsRequest.setsequenceNumber(sequenceNumber);
KinesisEvent.getRecord(getRecord);
}
}
please let me know a way to get records from the kinesis stream using sequence number and partition ID.