0
votes

I'm attempting to migrate CSV data from S3 to DynamoDB using Data Pipeline. The data is not in a DynamoDB export format but instead in a normal CSV.

I understand that Data Pipeline is more typically used as import or export of DynamoDB format rather than standard CSV. I think I've read across my Googling that is it possible to use a normal file but I haven't been able to put together something that works. The AWS documentation hasn't been terribly helpful either. I haven't been able to find reference posts that are relatively recent ( < 2 years old)

If this is possible, can anyone provide some insight on why my pipeline may not be working? I've pasted the pipeline and error message below. The error seems to indicate an issue plugging data into Dynamo, I'm guessing because it's not in the export format.

I'd do it in Lambda but the data load takes longer than 15 minutes.

Thanks

{
  "objects": [
    {
      "myComment": "Activity used to run the hive script to import CSV data",
      "output": {
        "ref": "dynamoDataTable"
      },
      "input": {
        "ref": "s3csv"
      },
      "name": "S3toDynamoLoader",
      "hiveScript": "DROP TABLE IF EXISTS tempHiveTable;\n\nDROP TABLE IF EXISTS s3TempTable;\n\nCREATE EXTERNAL TABLE tempHiveTable (#{myDDBColDef}) \nSTORED BY 'org.apache.hadoop.hive.dynamodb.DynamoDBStorageHandler' \nTBLPROPERTIES (\"dynamodb.table.name\" = \"#{myDDBTableName}\", \"dynamodb.column.mapping\" = \"#{myDDBTableColMapping}\");\n                    \nCREATE EXTERNAL TABLE s3TempTable (#{myS3ColDef}) \nROW FORMAT DELIMITED FIELDS TERMINATED BY ',' LINES TERMINATED BY '\\\n' LOCATION '#{myInputS3Loc}';\n                    \nINSERT OVERWRITE TABLE tempHiveTable SELECT * FROM s3TempTable;",
      "id": "S3toDynamoLoader",
      "runsOn": { "ref": "EmrCluster" },
      "stage": "false",
      "type": "HiveActivity"
    },
    {
      "myComment": "The DynamoDB table that we are uploading to",
      "name": "DynamoDB",
      "id": "dynamoDataTable",
      "type": "DynamoDBDataNode",
      "tableName": "#{myDDBTableName}",
      "writeThroughputPercent": "1.0",
      "dataFormat": {
        "ref": "DDBTableFormat"
      }
    },
    {
      "failureAndRerunMode": "CASCADE",
      "resourceRole": "DataPipelineDefaultResourceRole",
      "role": "DataPipelineDefaultRole",
      "pipelineLogUri": "#{myLogUri}",
      "scheduleType": "ONDEMAND",
      "name": "Default",
      "id": "Default"
    },
    {
      "name": "EmrCluster",
      "coreInstanceType": "m1.medium",
      "coreInstanceCount": "1",
      "masterInstanceType": "m1.medium",
      "releaseLabel": "emr-5.29.0",
      "id": "EmrCluster",
      "type": "EmrCluster",
      "terminateAfter": "2 Hours"
    },
    {
      "myComment": "The S3 file that contains the data we're importing",
      "directoryPath": "#{myInputS3Loc}",
      "dataFormat": {
        "ref": "csvFormat"
      },
      "name": "S3DataNode",
      "id": "s3csv",
      "type": "S3DataNode"
    },
    {
      "myComment": "Format for the S3 Path",
      "name": "S3ExportFormat",
      "column": "not_used STRING",
      "id": "csvFormat",
      "type": "CSV"
    },
    {
      "myComment": "Format for the DynamoDB table",
      "name": "DDBTableFormat",
      "id": "DDBTableFormat",
      "column": "not_used STRING",
      "type": "DynamoDBExportDataFormat"
    }
  ],
  "parameters": [
    {
      "description": "S3 Column Mappings",
      "id": "myS3ColDef",
      "default": "phoneNumber string,firstName string,lastName string, spend double",
      "type": "String"
    },
    {
      "description": "DynamoDB Column Mappings",
      "id": "myDDBColDef",
      "default": "phoneNumber String,firstName String,lastName String, spend double",
      "type": "String"
    },
    {
      "description": "Input S3 foder",
      "id": "myInputS3Loc",
      "default": "s3://POCproject-dev1-data/upload/",
      "type": "AWS::S3::ObjectKey"
    },
    {
      "description": "DynamoDB table name",
      "id": "myDDBTableName",
      "default": "POCproject-pipeline-data",
      "type": "String"
    },
    {
      "description": "S3 to DynamoDB Column Mapping",
      "id": "myDDBTableColMapping",
      "default": "phoneNumber:phoneNumber,firstName:firstName,lastName:lastName,spend:spend",
      "type": "String"
    },
    {
      "description": "DataPipeline Log Uri",
      "id": "myLogUri",
      "default": "s3://POCproject-dev1-data/",
      "type": "AWS::S3::ObjectKey"
    }
  ]
}

Error

[INFO] (TaskRunnerService-df-09432511OLZUA8VN0NLE_@EmrCluster_2020-03-06T02:52:47-0) df-09432511OLZUA8VN0NLE amazonaws.datapipeline.taskrunner.LogMessageUtil: Returning tail errorMsg :Caused by: java.lang.RuntimeException: com.amazonaws.services.dynamodbv2.model.AmazonDynamoDBException: One or more parameter values were invalid: An AttributeValue may not contain an empty string (Service: AmazonDynamoDBv2; Status Code: 400; Error Code: ValidationException; Request ID: UM56KGVOU511P6LS7LP1N0Q4HRVV4KQNSO5AEMVJF66Q9ASUAAJG)
    at org.apache.hadoop.dynamodb.DynamoDBFibonacciRetryer.handleException(DynamoDBFibonacciRetryer.java:108)
    at org.apache.hadoop.dynamodb.DynamoDBFibonacciRetryer.runWithRetry(DynamoDBFibonacciRetryer.java:83)
    at org.apache.hadoop.dynamodb.DynamoDBClient.writeBatch(DynamoDBClient.java:258)
    at org.apache.hadoop.dynamodb.DynamoDBClient.putBatch(DynamoDBClient.java:215)
    at org.apache.hadoop.dynamodb.write.AbstractDynamoDBRecordWriter.write(AbstractDynamoDBRecordWriter.java:112)
    at org.apache.hadoop.hive.dynamodb.write.HiveDynamoDBRecordWriter.write(HiveDynamoDBRecordWriter.java:42)
    at org.apache.hadoop.hive.ql.exec.FileSinkOperator.process(FileSinkOperator.java:762)
    at org.apache.hadoop.hive.ql.exec.Operator.forward(Operator.java:897)
    at org.apache.hadoop.hive.ql.exec.SelectOperator.process(SelectOperator.java:95)
    at org.apache.hadoop.hive.ql.exec.Operator.forward(Operator.java:897)
    at org.apache.hadoop.hive.ql.exec.TableScanOperator.process(TableScanOperator.java:130)
    at org.apache.hadoop.hive.ql.exec.MapOperator$MapOpCtx.forward(MapOperator.java:148)
    at org.apache.hadoop.hive.ql.exec.MapOperator.process(MapOperator.java:550)
    ... 18 more
Caused by: com.amazonaws.services.dynamodbv2.model.AmazonDynamoDBException: One or more parameter values were invalid: An AttributeValue may not contain an empty string (Service: AmazonDynamoDBv2; Status Code: 400; Error Code: ValidationException; Request ID: UM56KGVOU511P6LS7LP1N0Q4HRVV4KQNSO5AEMVJF66Q9ASUAAJG)
    at com.amazonaws.http.AmazonHttpClient$RequestExecutor.handleErrorResponse(AmazonHttpClient.java:1712)
    at com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeOneRequest(AmazonHttpClient.java:1367)
    at com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeHelper(AmazonHttpClient.java:1113)
    at com.amazonaws.http.AmazonHttpClient$RequestExecutor.doExecute(AmazonHttpClient.java:770)
    at com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeWithTimer(AmazonHttpClient.java:744)
    at com.amazonaws.http.AmazonHttpClient$RequestExecutor.execute(AmazonHttpClient.java:726)
    at com.amazonaws.http.AmazonHttpClient$RequestExecutor.access$500(AmazonHttpClient.java:686)
    at com.amazonaws.http.AmazonHttpClient$RequestExecutionBuilderImpl.execute(AmazonHttpClient.java:668)
    at com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:532)
1

1 Answers

1
votes

Have you tried this sample yet? It uses hive to import the CSV file to a DynamoDB table https://github.com/aws-samples/data-pipeline-samples/tree/master/samples/DynamoDBImportCSV