I'm trying to import a TSV file from S3 into DynamoDB using Data Pipelines, but I keep hitting a MalformedJsonException
. I've validated both pieces of Json that I provide: the definition of the data pipeline and the manifest of the S3 folder, so that's not the problem. Is there any way to go about figuring out what Json is malformed?
Definition of the job:
{
"objects": [
{
"output": {
"ref": "DDBDestinationTable"
},
"input": {
"ref": "S3InputDataNode"
},
"maximumRetries": "2",
"name": "TableLoadActivity",
"step": "s3://dynamodb-emr-#{myDDBRegion}/emr-ddb-storage-handler/2.1.0/emr-ddb-2.1.0.jar,org.apache.hadoop.dynamodb.tools.DynamoDbImport,#{input.directoryPath},#{output.tableName},#{output.writeThroughputPercent}",
"runsOn": {
"ref": "EmrClusterForLoad"
},
"id": "TableLoadActivity",
"type": "EmrActivity",
"resizeClusterBeforeRunning": "true"
},
{
"column": [
"property_id STRING",
"addr_line_1 STRING",
...
],
"name": "DefaultDataFormat1",
"id": "DataFormatId_JMZkM",
"type": "TSV"
},
{
"bootstrapAction": "s3://#{myDDBRegion}.elasticmapreduce/bootstrap-actions/configure-hadoop, --mapred-key-value,mapreduce.map.speculative=false",
"name": "EmrClusterForLoad",
"coreInstanceCount": "1",
"coreInstanceType": "m3.xlarge",
"amiVersion": "3.9.0",
"id": "EmrClusterForLoad",
"masterInstanceType": "m3.xlarge",
"region": "#{myDDBRegion}",
"type": "EmrCluster",
"terminateAfter": "1 Month"
},
{
"directoryPath": "#{myInputS3Loc}",
"dataFormat": {
"ref": "DataFormatId_JMZkM"
},
"name": "S3InputDataNode",
"id": "S3InputDataNode",
"type": "S3DataNode"
},
{
"writeThroughputPercent": "#{myDDBWriteThroughputRatio}",
"name": "DDBDestinationTable",
"id": "DDBDestinationTable",
"type": "DynamoDBDataNode",
"tableName": "#{myDDBTableName}"
},
{
"failureAndRerunMode": "CASCADE",
"resourceRole": "DataPipelineDefaultResourceRole",
"role": "DataPipelineDefaultRole",
"pipelineLogUri": "s3://log-bucket/",
"scheduleType": "ONDEMAND",
"name": "Default",
"id": "Default"
}
],
"parameters": [
{
"description": "Input S3 folder",
"id": "myInputS3Loc",
"type": "AWS::S3::ObjectKey"
},
{
"description": "Target DynamoDB table name",
"id": "myDDBTableName",
"type": "String"
},
{
"default": "0.25",
"watermark": "Enter value between 0.1-1.0",
"description": "DynamoDB write throughput ratio",
"id": "myDDBWriteThroughputRatio",
"type": "Double"
},
{
"default": "us-east-1",
"watermark": "us-east-1",
"description": "Region of the DynamoDB table",
"id": "myDDBRegion",
"type": "String"
}
],
"values": {
"myDDBRegion": "us-east-1",
"myDDBTableName": "TableName",
"myDDBWriteThroughputRatio": "0.5",
"myInputS3Loc": "s3://input/folder/"
}
}
Exception:
24 Jan 2018 23:59:56,657 [INFO] (TaskRunnerService-df-02737991EW1XAIM4T1PD_@EmrClusterForLoad_2018-01-24T23:27:35-0) df-02737991EW1XAIM4T1PD amazonaws.datapipeline.taskrunner.LogMessageUtil: Returning tail errorMsg : at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:415)
at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1548)
at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:170)
Caused by: com.google.gson.stream.MalformedJsonException: Expected ':' at line 1 column 36
at com.google.gson.stream.JsonReader.syntaxError(JsonReader.java:1298)
at com.google.gson.stream.JsonReader.objectValue(JsonReader.java:762)
at com.google.gson.stream.JsonReader.peek(JsonReader.java:380)
at com.google.gson.internal.bind.ReflectiveTypeAdapterFactory$Adapter.read(ReflectiveTypeAdapterFactory.java:158)
at com.google.gson.internal.bind.TypeAdapterRuntimeTypeWrapper.read(TypeAdapterRuntimeTypeWrapper.java:40)
at com.google.gson.internal.bind.MapTypeAdapterFactory$Adapter.read(MapTypeAdapterFactory.java:188)
at com.google.gson.internal.bind.MapTypeAdapterFactory$Adapter.read(MapTypeAdapterFactory.java:146)
at com.google.gson.Gson.fromJson(Gson.java:755)
... 17 more
Exception in thread "main" java.io.IOException: Job failed!
at org.apache.hadoop.mapred.JobClient.runJob(JobClient.java:836)
at org.apache.hadoop.dynamodb.tools.DynamoDbImport.run(DynamoDbImport.java:68)
at org.apache.hadoop.util.ToolRunner.run(ToolRunner.java:70)
at org.apache.hadoop.dynamodb.tools.DynamoDbImport.main(DynamoDbImport.java:30)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at org.apache.hadoop.util.RunJar.main(RunJar.java:212)