3
votes

I'm trying to import a TSV file from S3 into DynamoDB using Data Pipelines, but I keep hitting a MalformedJsonException. I've validated both pieces of Json that I provide: the definition of the data pipeline and the manifest of the S3 folder, so that's not the problem. Is there any way to go about figuring out what Json is malformed?

Definition of the job:

{
  "objects": [
    {
      "output": {
        "ref": "DDBDestinationTable"
      },
      "input": {
        "ref": "S3InputDataNode"
      },
      "maximumRetries": "2",
      "name": "TableLoadActivity",
      "step": "s3://dynamodb-emr-#{myDDBRegion}/emr-ddb-storage-handler/2.1.0/emr-ddb-2.1.0.jar,org.apache.hadoop.dynamodb.tools.DynamoDbImport,#{input.directoryPath},#{output.tableName},#{output.writeThroughputPercent}",
      "runsOn": {
        "ref": "EmrClusterForLoad"
      },
      "id": "TableLoadActivity",
      "type": "EmrActivity",
      "resizeClusterBeforeRunning": "true"
    },
    {
      "column": [
        "property_id STRING",
        "addr_line_1 STRING",
        ...
      ],
      "name": "DefaultDataFormat1",
      "id": "DataFormatId_JMZkM",
      "type": "TSV"
    },
    {
      "bootstrapAction": "s3://#{myDDBRegion}.elasticmapreduce/bootstrap-actions/configure-hadoop, --mapred-key-value,mapreduce.map.speculative=false",
      "name": "EmrClusterForLoad",
      "coreInstanceCount": "1",
      "coreInstanceType": "m3.xlarge",
      "amiVersion": "3.9.0",
      "id": "EmrClusterForLoad",
      "masterInstanceType": "m3.xlarge",
      "region": "#{myDDBRegion}",
      "type": "EmrCluster",
      "terminateAfter": "1 Month"
    },
    {
      "directoryPath": "#{myInputS3Loc}",
      "dataFormat": {
        "ref": "DataFormatId_JMZkM"
      },
      "name": "S3InputDataNode",
      "id": "S3InputDataNode",
      "type": "S3DataNode"
    },
    {
      "writeThroughputPercent": "#{myDDBWriteThroughputRatio}",
      "name": "DDBDestinationTable",
      "id": "DDBDestinationTable",
      "type": "DynamoDBDataNode",
      "tableName": "#{myDDBTableName}"
    },
    {
      "failureAndRerunMode": "CASCADE",
      "resourceRole": "DataPipelineDefaultResourceRole",
      "role": "DataPipelineDefaultRole",
      "pipelineLogUri": "s3://log-bucket/",
      "scheduleType": "ONDEMAND",
      "name": "Default",
      "id": "Default"
    }
  ],
  "parameters": [
    {
      "description": "Input S3 folder",
      "id": "myInputS3Loc",
      "type": "AWS::S3::ObjectKey"
    },
    {
      "description": "Target DynamoDB table name",
      "id": "myDDBTableName",
      "type": "String"
    },
    {
      "default": "0.25",
      "watermark": "Enter value between 0.1-1.0",
      "description": "DynamoDB write throughput ratio",
      "id": "myDDBWriteThroughputRatio",
      "type": "Double"
    },
    {
      "default": "us-east-1",
      "watermark": "us-east-1",
      "description": "Region of the DynamoDB table",
      "id": "myDDBRegion",
      "type": "String"
    }
  ],
  "values": {
    "myDDBRegion": "us-east-1",
    "myDDBTableName": "TableName",
    "myDDBWriteThroughputRatio": "0.5",
    "myInputS3Loc": "s3://input/folder/"
  }
}

Exception:

24 Jan 2018 23:59:56,657 [INFO] (TaskRunnerService-df-02737991EW1XAIM4T1PD_@EmrClusterForLoad_2018-01-24T23:27:35-0) df-02737991EW1XAIM4T1PD amazonaws.datapipeline.taskrunner.LogMessageUtil: Returning tail errorMsg :    at java.security.AccessController.doPrivileged(Native Method)
    at javax.security.auth.Subject.doAs(Subject.java:415)
    at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1548)
    at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:170)
Caused by: com.google.gson.stream.MalformedJsonException: Expected ':' at line 1 column 36
    at com.google.gson.stream.JsonReader.syntaxError(JsonReader.java:1298)
    at com.google.gson.stream.JsonReader.objectValue(JsonReader.java:762)
    at com.google.gson.stream.JsonReader.peek(JsonReader.java:380)
    at com.google.gson.internal.bind.ReflectiveTypeAdapterFactory$Adapter.read(ReflectiveTypeAdapterFactory.java:158)
    at com.google.gson.internal.bind.TypeAdapterRuntimeTypeWrapper.read(TypeAdapterRuntimeTypeWrapper.java:40)
    at com.google.gson.internal.bind.MapTypeAdapterFactory$Adapter.read(MapTypeAdapterFactory.java:188)
    at com.google.gson.internal.bind.MapTypeAdapterFactory$Adapter.read(MapTypeAdapterFactory.java:146)
    at com.google.gson.Gson.fromJson(Gson.java:755)
    ... 17 more

Exception in thread "main" java.io.IOException: Job failed!
    at org.apache.hadoop.mapred.JobClient.runJob(JobClient.java:836)
    at org.apache.hadoop.dynamodb.tools.DynamoDbImport.run(DynamoDbImport.java:68)
    at org.apache.hadoop.util.ToolRunner.run(ToolRunner.java:70)
    at org.apache.hadoop.dynamodb.tools.DynamoDbImport.main(DynamoDbImport.java:30)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:606)
    at org.apache.hadoop.util.RunJar.main(RunJar.java:212)
1

1 Answers

0
votes

In the AWS console, click Create new pipeline and Import a definition and see whether your json can be imported correctly.

Do you create the pipeline from the command line? I suspect there is some problem with this command.

I assume the ... are not present in your actual json :)