1
votes

I have an S3 json file that is 4MB in size. I crawled the data using AWS glue and it generated the data catalog table corresponding for that. I created a job (using ETL console in AWS Glue to upload the data in mazon Redshift.

With the same format of the data but different file size, the data are loaded properly to the database. But when it reaches 4MB in size, the error shows

"An error occured while calling 0146.pyWriteDynamicFrame. Error (code 1204) whie loading data into REdshift. "String length exceeds DDL Length"

Is there anyone who can help me solve the issue? My script is as follows. This was generated using the glue console.

import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job

## @params: [TempDir, JOB_NAME]
args = getResolvedOptions(sys.argv, ['TempDir','JOB_NAME'])

sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)

datasource0 = glueContext.create_dynamic_frame.from_catalog(database = "tga", table_name = "db", transformation_ctx = "datasource0")
applymapping1 = ApplyMapping.apply(frame = datasource0, mappings = [("attachments", "string", "attachments", "string"), ("classifications.Classification", "array", "`classifications.Classification`", "string"), ("code", "string", "code", "string"), ("completionmapping.NrtCompletion", "array", "`completionmapping.NrtCompletion`", "string"), ("componenttype", "string", "componenttype", "string"), ("contacts.Contact", "array", "`contacts.Contact`", "string"), ("createddate.DateTime", "string", "`createddate.DateTime`", "string"), ("createddate.OffsetMinutes", "string", "`createddate.OffsetMinutes`", "string"), ("currencyperiods.NrtCurrencyPeriod", "array", "`currencyperiods.NrtCurrencyPeriod`", "string"), ("currencystatus", "string", "currencystatus", "string"), ("datamanagers.DataManagerAssignment", "array", "`datamanagers.DataManagerAssignment`", "string"), ("islegacydata", "boolean", "islegacydata", "boolean"), ("isstreamlined", "boolean", "isstreamlined", "boolean"), ("mappinginformation.Mapping", "array", "`mappinginformation.Mapping`", "string"), ("recognitionmanagers.RecognitionManagerAssignment", "array", "`recognitionmanagers.RecognitionManagerAssignment`", "string"), ("restrictions", "string", "restrictions", "string"), ("reversemappinginformation.Mapping", "array", "`reversemappinginformation.Mapping`", "string"), ("title", "string", "title", "string"), ("updateddate.DateTime", "string", "`updateddate.DateTime`", "string"), ("updateddate.OffsetMinutes", "string", "`updateddate.OffsetMinutes`", "string"), ("_code", "string", "_code", "string"), ("_salesforceid", "string", "_salesforceid", "string"), ("_api", "string", "_api", "string"), ("_timestamp", "string", "_timestamp", "string"), ("parentcode", "string", "parentcode", "string"), ("parenttitle", "string", "parenttitle", "string"), ("releases.Release", "array", "`releases.Release`", "string"), ("usagerecommendations.UsageRecommendation", "array", "`usagerecommendations.UsageRecommendation`", "string")], transformation_ctx = "applymapping1")
resolvechoice2 = ResolveChoice.apply(frame = applymapping1, choice = "make_cols", transformation_ctx = "resolvechoice2")
dropnullfields3 = DropNullFields.apply(frame = resolvechoice2, transformation_ctx = "dropnullfields3")
datasink4 = glueContext.write_dynamic_frame.from_jdbc_conf(frame = dropnullfields3, catalog_connection = "redshift", connection_options = {"dbtable": "trainingcomponentservicegetdetails", "database": "db"}, redshift_tmp_dir = args["TempDir"], transformation_ctx = "datasink4")
job.commit()

Scala Script...

import com.amazonaws.services.glue.ChoiceOption
import com.amazonaws.services.glue.GlueContext
import com.amazonaws.services.glue.MappingSpec
import com.amazonaws.services.glue.ResolveSpec
import com.amazonaws.services.glue.errors.CallSite
import com.amazonaws.services.glue.util.GlueArgParser
import com.amazonaws.services.glue.util.Job
import com.amazonaws.services.glue.util.JsonOptions
import org.apache.spark.SparkContext
import org.apache.spark.sql.types.MetadataBuilder
import scala.collection.JavaConverters._

object GlueApp {
  def main(sysArgs: Array[String]) {
    val spark: SparkContext = new SparkContext()
    val glueContext: GlueContext = new GlueContext(spark)
    val args = GlueArgParser.getResolvedOptions(sysArgs, Seq("TempDir","JOB_NAME").toArray)
    Job.init(args("JOB_NAME"), glueContext, args.asJava)
    val datasource0 = glueContext.getCatalogSource(database = "tga", tableName = "trainingcomponentservicegetdetails", redshiftTmpDir = "", transformationContext = "datasource0").getDynamicFrame()
    val applymapping1 = datasource0.applyMapping(mappings = Seq(("attachments", "string", "attachments", "string"), ("classifications.Classification", "array", "`classifications.Classification`", "string"), ("code", "string", "code", "string"), ("completionmapping.NrtCompletion", "array", "`completionmapping.NrtCompletion`", "string"), ("componenttype", "string", "componenttype", "string"), ("contacts.Contact", "array", "`contacts.Contact`", "string"), ("createddate.DateTime", "string", "`createddate.DateTime`", "string"), ("createddate.OffsetMinutes", "string", "`createddate.OffsetMinutes`", "string"), ("currencyperiods.NrtCurrencyPeriod", "array", "`currencyperiods.NrtCurrencyPeriod`", "string"), ("currencystatus", "string", "currencystatus", "string"), ("datamanagers.DataManagerAssignment", "array", "`datamanagers.DataManagerAssignment`", "string"), ("islegacydata", "boolean", "islegacydata", "boolean"), ("isstreamlined", "boolean", "isstreamlined", "boolean"), ("mappinginformation.Mapping", "array", "`mappinginformation.Mapping`", "string"), ("recognitionmanagers.RecognitionManagerAssignment", "array", "`recognitionmanagers.RecognitionManagerAssignment`", "string"), ("restrictions", "string", "restrictions", "string"), ("reversemappinginformation.Mapping", "array", "`reversemappinginformation.Mapping`", "string"), ("title", "string", "title", "string"), ("updateddate.DateTime", "string", "`updateddate.DateTime`", "string"), ("updateddate.OffsetMinutes", "string", "`updateddate.OffsetMinutes`", "string"), ("_code", "string", "_code", "string"), ("_salesforceid", "string", "_salesforceid", "string"), ("_api", "string", "_api", "string"), ("_timestamp", "long", "_timestamp", "long"), ("industrysectors.TrainingComponentIndustrySector", "array", "`industrysectors.TrainingComponentIndustrySector`", "string"), ("occupations.TrainingComponentOccupation", "array", "`occupations.TrainingComponentOccupation`", "string"), ("parentcode", "string", "parentcode", "string"), ("parenttitle", "string", "parenttitle", "string"), ("releases.Release", "array", "`releases.Release`", "string"), ("usagerecommendations.UsageRecommendation", "array", "`usagerecommendations.UsageRecommendation`", "string"), ("tpdevelopercode", "string", "tpdevelopercode", "string")), caseSensitive = false, transformationContext = "applymapping1")
    val resolvechoice2 = applymapping1.resolveChoice(choiceOption = Some(ChoiceOption("make_cols")), transformationContext = "resolvechoice2")
    val dropnullfields3 = resolvechoice2.dropNulls(transformationContext = "dropnullfields3")
    val datasink4 = glueContext.getJDBCSink(catalogConnection = "redshift", options = JsonOptions("""{"dbtable": "trainingcomponentservicegetdetails", "database": "dbanasightmla"}"""), redshiftTmpDir = args("TempDir"), transformationContext = "datasink4").writeDynamicFrame(dropnullfields3)

    Job.commit()
  }
}

I found this but i am unable to make it work.

https://github.com/databricks/spark-redshift

    val columnLengthMap = Map(
"attachments" ->4000, 
"classifications.Classification" ->4000, 
"code" ->4000, 
"completionmapping.NrtCompletion" ->4000, 
"componenttype" ->4000, 
"contacts.Contact" ->4000, 
"createddate.DateTime" ->4000,  
"createddate.OffsetMinutes" ->4000, 
"currencyperiods.NrtCurrencyPeriod" ->4000, 
"currencystatus" ->4000, 
"datamanagers.DataManagerAssignment" ->4000, 
"`datamanagers.DataManagerAssignment`" ->4000, 
"islegacydata" ->4000,
"isstreamlined" ->4000,
"mappinginformation.Mapping" ->4000,
"recognitionmanagers.RecognitionManagerAssignment" ->4000,
"restrictions" ->4000,
"reversemappinginformation.Mapping" ->4000,
"title" ->4000,
"updateddate.DateTime" ->4000, 
"updateddate.OffsetMinutes" ->4000,
"_code" ->4000,
"_salesforceid" ->4000,
"_api" ->4000,
"_timestamp" ->4000,
"industrysectors.TrainingComponentIndustrySector" ->4000,
"occupations.TrainingComponentOccupation" ->4000,
"parentcode" ->4000,
"parenttitle" ->4000,
"releases.Release" ->4000,
"usagerecommendations.UsageRecommendation" ->4000,
"tpdevelopercode" ->4000)


val df: DataFrame = sqlContext.read
.format("com.databricks.spark.redshift")
.option("jdbc:redshift://anasight-redshift-mla.cf2ow8sevrix.ap-southeast-2.redshift.amazonaws.com:5439/dbanasightmla")
.option("trainingcomponentservicegetdetails", "trainingcomponentservicegetdetails")
.option("tempdir", "s3://redshift-anasight-2018/EMPLOYMENT")
.load()

columnLengthMap.foreach 
{ case (colName, length) =>
val metadata = new MetadataBuilder().putLong("maxlength", length).build()
df = df.withColumn(colName, df(colName).as(colName, metadata))
}
1

1 Answers

0
votes

According to this redshift document, "The maximum size of a single row loaded by using the COPY command is 4 MB. For more information, see COPY in the Amazon Redshift Database Developer Guide." I believe, at some point the record size is exceeding the 4mb limit. Can you check the record size.

https://docs.aws.amazon.com/redshift/latest/mgmt/amazon-redshift-limits.html

Think the row size limit is from the Redshift side and I believe Glue connections also internally do use a COPY command. see this AWS document link here.

Please refer to this discussions in AWS forum as well : https://forums.aws.amazon.com/thread.jspa?threadID=150345

Reference for DMS S3 to Redshift