2
votes

I have a python 3 job in AWS a Glue (version 1.0) that has bookmarking enabled. This job transforms a json data source to parquet file format in an s3 bucket. This job run flawlessly the first time, or if I reset the bookmark.

Subsequent runs however, fail with the following error.

AnalysisException: '\nDatasource does not support writing empty or nested empty schemas.\nPlease make sure the data schema has at least one or more column(s).\n ;'

The script used is generated by the AWS console without any modifications, the source is json files in an S3 bucket using data catalog and the output is another bucket.

    import sys
    from awsglue.transforms import *
    from awsglue.utils import getResolvedOptions
    from pyspark.context import SparkContext
    from awsglue.context import GlueContext
    from awsglue.job import Job

    ## @params: [JOB_NAME]
    args = getResolvedOptions(sys.argv, ['JOB_NAME'])

    sc = SparkContext()
    glueContext = GlueContext(sc)
    spark = glueContext.spark_session
    job = Job(glueContext)
    job.init(args['JOB_NAME'], args)
    ## @type: DataSource
    ## @args: [database = "segment", table_name = "segment_zlw54zvojf", transformation_ctx = "datasource0"]
    ## @return: datasource0
    ## @inputs: []
    datasource0 = glueContext.create_dynamic_frame.from_catalog(database = "segment", table_name = "segment_zlw54zvojf", transformation_ctx = "datasource0")
    ## @type: ApplyMapping
    ## @args: [mapping = [("channel", "string", "channel", "string"), ("context", "struct", "context", "struct"), ("event", "string", "event", "string"), ("integrations", "struct", "integrations", "struct"), ("messageid", "string", "messageid", "string"), ("projectid", "string", "projectid", "string"), ("properties", "struct", "properties", "struct"), ("receivedat", "string", "receivedat", "string"), ("timestamp", "string", "timestamp", "string"), ("type", "string", "type", "string"), ("userid", "string", "userid", "string"), ("version", "int", "version", "int"), ("anonymousid", "string", "anonymousid", "string"), ("partition_0", "string", "partition_0", "string")], transformation_ctx = "applymapping1"]
    ## @return: applymapping1
    ## @inputs: [frame = datasource0]
    applymapping1 = ApplyMapping.apply(frame = datasource0, mappings = [("channel", "string", "channel", "string"), ("context", "struct", "context", "struct"), ("event", "string", "event", "string"), ("integrations", "struct", "integrations", "struct"), ("messageid", "string", "messageid", "string"), ("projectid", "string", "projectid", "string"), ("properties", "struct", "properties", "struct"), ("receivedat", "string", "receivedat", "string"), ("timestamp", "string", "timestamp", "string"), ("type", "string", "type", "string"), ("userid", "string", "userid", "string"), ("version", "int", "version", "int"), ("anonymousid", "string", "anonymousid", "string"), ("partition_0", "string", "partition_0", "string")], transformation_ctx = "applymapping1")
    ## @type: ResolveChoice
    ## @args: [choice = "make_struct", transformation_ctx = "resolvechoice2"]
    ## @return: resolvechoice2
    ## @inputs: [frame = applymapping1]
    resolvechoice2 = ResolveChoice.apply(frame = applymapping1, choice = "make_struct", transformation_ctx = "resolvechoice2")
    ## @type: DropNullFields
    ## @args: [transformation_ctx = "dropnullfields3"]
    ## @return: dropnullfields3
    ## @inputs: [frame = resolvechoice2]
    dropnullfields3 = DropNullFields.apply(frame = resolvechoice2, transformation_ctx = "dropnullfields3")
    ## @type: DataSink
    ## @args: [connection_type = "s3", connection_options = {"path": "s3://mydestination.datalake.raw/segment/iterable"}, format = "parquet", transformation_ctx = "datasink4"]
    ## @return: datasink4
    ## @inputs: [frame = dropnullfields3]
    datasink4 = glueContext.write_dynamic_frame.from_options(frame = dropnullfields3, connection_type = "s3", connection_options = {"path": "s3://mydestination.datalake.raw/segment/iterable"}, format = "parquet", transformation_ctx = "datasink4")
    job.commit()

Any advice would be greatly appreciated.

1

1 Answers

0
votes

So I got to the bottom of this issue.

By source S3 bucket has new data written to it on a daily basis. However this data is written into new sub folders in my s3 bucket.

In order for these new sub folders to be identified by the AWS glue job, I need to rerun the AWS crawler to update the source data catalog.

With out doing this, no new data is identified and the default AWS generated script attempts to write an empty data set and fails.

To resolve this I schedule my Crawler to execute prior to my job executing.