9
votes

I am trying to transfer CSV data from S3 bucket to DynamoDB using AWS pipeline, following is my pipe line script, it is not working properly,

CSV file structure

Name, Designation,Company

A,TL,C1

B,Prog, C2

DynamoDb : N_Table, with Name as hash value

{
"objects": [
    {
        "id": "Default",
        "scheduleType": "cron",
        "name": "Default",
        "role": "DataPipelineDefaultRole",
        "resourceRole": "DataPipelineDefaultResourceRole"
    },
    {
        "id": "DynamoDBDataNodeId635",
        "schedule": {
            "ref": "ScheduleId639"
        },
        "tableName": "N_Table",
        "name": "MyDynamoDBData",
        "type": "DynamoDBDataNode"
    },
    {
        "emrLogUri": "s3://onlycsv/error",
        "id": "EmrClusterId636",
        "schedule": {
            "ref": "ScheduleId639"
        },
        "masterInstanceType": "m1.small",
        "coreInstanceType": "m1.xlarge",
        "enableDebugging": "true",
        "installHive": "latest",
        "name": "ImportCluster",
        "coreInstanceCount": "1",
        "logUri": "s3://onlycsv/error1",
        "type": "EmrCluster"
    },
    {
        "id": "S3DataNodeId643",
        "schedule": {
            "ref": "ScheduleId639"
        },
        "directoryPath": "s3://onlycsv/data.csv",
        "name": "MyS3Data",
        "dataFormat": {
            "ref": "DataFormatId1"
        },
        "type": "S3DataNode"
    },
    {
        "id": "ScheduleId639",
        "startDateTime": "2013-08-03T00:00:00",
        "name": "ImportSchedule",
        "period": "1 Hours",
        "type": "Schedule",
        "endDateTime": "2013-08-04T00:00:00"
    },
    {
        "id": "EmrActivityId637",
        "input": {
            "ref": "S3DataNodeId643"
        },
        "schedule": {
            "ref": "ScheduleId639"
        },
        "name": "MyImportJob",
        "runsOn": {
            "ref": "EmrClusterId636"
        },
        "maximumRetries": "0",
        "myDynamoDBWriteThroughputRatio": "0.25",
        "attemptTimeout": "24 hours",
        "type": "EmrActivity",
        "output": {
            "ref": "DynamoDBDataNodeId635"
        },
        "step": "s3://elasticmapreduce/libs/script-runner/script-runner.jar,s3://elasticmapreduce/libs/hive/hive-script,--run-hive-script,--hive-versions,latest,--args,-f,s3://elasticmapreduce/libs/hive/dynamodb/importDynamoDBTableFromS3,-d,DYNAMODB_OUTPUT_TABLE=#{output.tableName},-d,S3_INPUT_BUCKET=#{input.directoryPath},-d,DYNAMODB_WRITE_PERCENT=#{myDynamoDBWriteThroughputRatio},-d,DYNAMODB_ENDPOINT=dynamodb.us-east-1.amazonaws.com"
    },
    {
        "id": "DataFormatId1",
        "name": "DefaultDataFormat1",
        "column": [
            "Name",
            "Designation",
            "Company"
        ],
        "columnSeparator": ",",
        "recordSeparator": "\n",
        "type": "Custom"
    }
]

}

Out of four steps while executing the pipeline, two are getting finished, but it is not executing completely

4

4 Answers

5
votes

Currently (2015-04) default import pipeline template does not support importing CSV files.

If your CSV file is not too big (under 1GB or so) you can create a ShellCommandActivity to convert CSV to DynamoDB JSON format first and the feed that to EmrActivity that imports the resulting JSON file into your table.

As a first step you can create sample DynamoDB table including all the field types you need, populate with dummy values and then export the records using pipeline (Export/Import button in DynamoDB console). This will give you the idea about the format that is expected by Import pipeline. The type names are not obvious, and the Import activity is very sensitive about the correct case (e.g. you should have bOOL for boolean field).

Afterwards it should be easy to create an awk script (or any other text converter, at least with awk you can use the default AMI image for your shell activity), which you can feed to your shellCommandActivity. Don't forget to enable "staging" flag, so your output is uploaded back to S3 for the Import activity to pick it up.

3
votes

If you are using the template data pipeline for Importing data from S3 to DynamoDB, these dataformats won't work. Instead, use the format in the link below to store the input S3 data file http://docs.aws.amazon.com/datapipeline/latest/DeveloperGuide/dp-importexport-ddb-pipelinejson-verifydata2.html

This format of the output file generated by the template data pipeline that exports data from DynamoDB to S3.

Hope that helps.

0
votes

I would recommend using the CSV data format provided by datapipeline instead of custom.

For debugging the errors on cluster, you can lookup the jobflow in EMR console and look at the log files for the tasks that failed.

0
votes

See below link for a solution that works (in the question section), albeit EMR 3.x. Just change the delimiter to "columnSeparator": ",". Personally, I wouldn't do CSV unless you are certain the data is sanitized correctly.

How to upgrade Data Pipeline definition from EMR 3.x to 4.x/5.x?