I'm trying to figure out how I can create an AWS data pipeline that can take a json file from S3 and import this into a DynamoDB table. I'm able to create some java code that achieves this but I want to do it through Data pipeline. I can see there are templates for exporting from DynamoDB to S3 and importing a backup but I'm struggling to figure out how to import a plain json file.
1 Answers
In the documentation you will find an example of importing and exporting data from DynamoDb (http://docs.aws.amazon.com/datapipeline/latest/DeveloperGuide/dp-importexport-ddb.html)
Here's how it is described in the documentation:
To create the pipeline
Open the AWS Data Pipeline console at
The first screen that you see depends on whether you've created a pipeline in the current region.
If you haven't created a pipeline in this region, the console displays an introductory screen. Choose Get started now.
If you've already created a pipeline in this region, the console displays a page that lists your pipelines for the region. Choose Create new pipeline.
In Name, enter a name for your pipeline.
(Optional) In Description, enter a description for your pipeline.
For Source, select Build using a template, and then select the following template: Import DynamoDB backup data from S3.
Under Parameters, set Input S3 folder to s3://elasticmapreduce/samples/Store/ProductCatalog, which is a sample data source, and set DynamoDB table name to the name of your table.
Under Schedule, choose on pipeline activation.
Under Pipeline Configuration, leave logging enabled. Choose the folder icon under S3 location for logs, select one of your buckets or folders, and then choose Select.
If you prefer, you can disable logging instead.
Under Security/Access, leave IAM roles set to Default.
Click Edit in Architect.
Next, configure the Amazon SNS notification actions that AWS Data Pipeline performs depending on the outcome of the activity.
To configure the success and failure actions
In the right pane, click Activities.
From Add an optional field, select On Success.
From the newly added On Success, select Create new: Action.
From Add an optional field, select On Fail.
From the newly added On Fail, select Create new: Action.
In the right pane, click Others.
For DefaultAction1, do the following:
Change the name to SuccessSnsAlarm.
From Type, select SnsAlarm.
In Topic Arn, enter the ARN of the topic that you created.
Enter a subject and a message.
For DefaultAction2, do the following:
Change the name to FailureSnsAlarm.
From Type, select SnsAlarm.
In Topic Arn, enter the ARN of the topic that you created.
Enter a subject and a message.
The public github site has some samples for working with DynamoDB (https://github.com/awslabs/data-pipeline-samples). Here's an example of a pipeline definition:
{
"objects": [
{
"occurrences": "1",
"period": "1 Day",
"name": "RunOnce",
"id": "DefaultSchedule",
"type": "Schedule",
"startAt": "FIRST_ACTIVATION_DATE_TIME",
"maxActiveInstances" : "1"
},
{
"failureAndRerunMode": "CASCADE",
"schedule": {
"ref": "DefaultSchedule"
},
"resourceRole": "DataPipelineDefaultResourceRole",
"role": "DataPipelineDefaultRole",
"pipelineLogUri": "s3://",
"scheduleType": "cron",
"name": "Default",
"id": "Default"
},
{
"maximumRetries": "2",
"name": "TableBackupActivity",
"step": "s3://dynamodb-emr-us-east-1/emr-ddb-storage-handler/2.1.0/emr-ddb-2.1.0.jar,org.apache.hadoop.dynamodb.tools.DynamoDbExport,#{myOutputS3Loc}/#{format(@scheduledStartTime, 'YYYY-MM-dd-HH-mm-ss')},#{myDDBTableName},#{myDDBReadThroughputRatio}",
"id": "TableBackupActivity",
"runsOn": {
"ref": "EmrClusterForBackup"
},
"type": "EmrActivity"
},
{
"bootstrapAction": "s3://elasticmapreduce/bootstrap-actions/configure-hadoop, --yarn-key-value, yarn.nodemanager.resource.memory-mb=12800,--yarn-key-value,yarn.scheduler.minimum-allocation-mb=256,--mapred-key-value,mapreduce.map.memory.mb=500,--mapred-key-value,mapreduce.map.java.opts=-Xmx400M,--mapred-key-value,mapreduce.job.reduce.slowstart.completedmaps=1,--mapred-key-value,mapreduce.map.speculative=false",
"name": "EmrClusterForBackup",
"amiVersion": "3.8.0",
"id": "EmrClusterForBackup",
"type": "EmrCluster",
"masterInstanceType": "m1.medium",
"coreInstanceType": "#{myInstanceType}",
"coreInstanceCount": "#{myInstanceCount}",
"terminateAfter" : "12 hours"
}
],
"parameters": [
{
"description": "OutputS3folder",
"id": "myOutputS3Loc",
"type": "AWS::S3::ObjectKey"
},
{
"default": "0.2",
"watermark": "Valuebetween0.1-1.0",
"description": "DynamoDB Read Throughput Ratio",
"id": "myDDBReadThroughputRatio",
"type": "Double"
},
{
"description": "DynamoDB Table Name",
"id": "myDDBTableName",
"type": "String"
},
{
"description": "Instance Type",
"id": "myInstanceType",
"watermark" : "Use m1.medium if Read Capacity Units for the job <= 900. Else use m3.xlarge",
"type": "String",
"default": "m3.xlarge"
},
{
"description": "Instance Count",
"watermark" : "(Read Capacity Units / 300) for m1.medium if RCU <= 900. Else (RCU / 1500) for m3.xlarge",
"id": "myInstanceCount",
"type": "Integer",
"default": "1"
},
{
"description" : "Burst IOPs",
"watermark" : "Add IOPS to the DDB table by this percent for the duration of the export job",
"id" : "myBurstIOPS",
"type" : "Double",
"default" : "0.0"
}
]
}