I have a python 3 job in AWS a Glue (version 1.0) that has bookmarking enabled. This job transforms a json data source to parquet file format in an s3 bucket. This job run flawlessly the first time, or if I reset the bookmark.
Subsequent runs however, fail with the following error.
AnalysisException: '\nDatasource does not support writing empty or nested empty schemas.\nPlease make sure the data schema has at least one or more column(s).\n ;'
The script used is generated by the AWS console without any modifications, the source is json files in an S3 bucket using data catalog and the output is another bucket.
import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
## @params: [JOB_NAME]
args = getResolvedOptions(sys.argv, ['JOB_NAME'])
sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)
## @type: DataSource
## @args: [database = "segment", table_name = "segment_zlw54zvojf", transformation_ctx = "datasource0"]
## @return: datasource0
## @inputs: []
datasource0 = glueContext.create_dynamic_frame.from_catalog(database = "segment", table_name = "segment_zlw54zvojf", transformation_ctx = "datasource0")
## @type: ApplyMapping
## @args: [mapping = [("channel", "string", "channel", "string"), ("context", "struct", "context", "struct"), ("event", "string", "event", "string"), ("integrations", "struct", "integrations", "struct"), ("messageid", "string", "messageid", "string"), ("projectid", "string", "projectid", "string"), ("properties", "struct", "properties", "struct"), ("receivedat", "string", "receivedat", "string"), ("timestamp", "string", "timestamp", "string"), ("type", "string", "type", "string"), ("userid", "string", "userid", "string"), ("version", "int", "version", "int"), ("anonymousid", "string", "anonymousid", "string"), ("partition_0", "string", "partition_0", "string")], transformation_ctx = "applymapping1"]
## @return: applymapping1
## @inputs: [frame = datasource0]
applymapping1 = ApplyMapping.apply(frame = datasource0, mappings = [("channel", "string", "channel", "string"), ("context", "struct", "context", "struct"), ("event", "string", "event", "string"), ("integrations", "struct", "integrations", "struct"), ("messageid", "string", "messageid", "string"), ("projectid", "string", "projectid", "string"), ("properties", "struct", "properties", "struct"), ("receivedat", "string", "receivedat", "string"), ("timestamp", "string", "timestamp", "string"), ("type", "string", "type", "string"), ("userid", "string", "userid", "string"), ("version", "int", "version", "int"), ("anonymousid", "string", "anonymousid", "string"), ("partition_0", "string", "partition_0", "string")], transformation_ctx = "applymapping1")
## @type: ResolveChoice
## @args: [choice = "make_struct", transformation_ctx = "resolvechoice2"]
## @return: resolvechoice2
## @inputs: [frame = applymapping1]
resolvechoice2 = ResolveChoice.apply(frame = applymapping1, choice = "make_struct", transformation_ctx = "resolvechoice2")
## @type: DropNullFields
## @args: [transformation_ctx = "dropnullfields3"]
## @return: dropnullfields3
## @inputs: [frame = resolvechoice2]
dropnullfields3 = DropNullFields.apply(frame = resolvechoice2, transformation_ctx = "dropnullfields3")
## @type: DataSink
## @args: [connection_type = "s3", connection_options = {"path": "s3://mydestination.datalake.raw/segment/iterable"}, format = "parquet", transformation_ctx = "datasink4"]
## @return: datasink4
## @inputs: [frame = dropnullfields3]
datasink4 = glueContext.write_dynamic_frame.from_options(frame = dropnullfields3, connection_type = "s3", connection_options = {"path": "s3://mydestination.datalake.raw/segment/iterable"}, format = "parquet", transformation_ctx = "datasink4")
job.commit()
Any advice would be greatly appreciated.