5
votes

We're evaluating AWS Glue for a big data project, with some ETL. We added a crawler, which is correctly picking up a CSV file from S3. Initially, we simply want to transform that CSV to JSON, and drop the file in another S3 location (same bucket, different path).

We used the script as provided by AWS (no custom script here). And just mapped all the columns.

The target folder is empty (job has been just created), but the job fails with "File already exists": snapshot here. The S3 location were we pretend to drop the output was empty before starting the job. However after the error we do see two files, but those seems to be partials: snapshot

Any ideas on what might be going on?

Here's the fully stack:

    Container: container_1513099821372_0007_01_000001 on ip-172-31-49-38.ec2.internal_8041
    LogType:stdout
    Log Upload Time:Tue Dec 12 19:12:04 +0000 2017
    LogLength:8462
    Log Contents:
    Traceback (most recent call last):
    File "script_2017-12-12-19-11-08.py", line 30, in 
    datasink2 = glueContext.write_dynamic_frame.from_options(frame = applymapping1, connection_type = "s3", connection_options =
    {
        "path": "s3://primero-viz/output/tcw_entries"
    }
    , format = "json", transformation_ctx = "datasink2")
    File "/mnt/yarn/usercache/root/appcache/application_1513099821372_0007/container_1513099821372_0007_01_000001/PyGlue.zip/awsglue/dynamicframe.py", line 523, in from_options
    File "/mnt/yarn/usercache/root/appcache/application_1513099821372_0007/container_1513099821372_0007_01_000001/PyGlue.zip/awsglue/context.py", line 175, in write_dynamic_frame_from_options
    File "/mnt/yarn/usercache/root/appcache/application_1513099821372_0007/container_1513099821372_0007_01_000001/PyGlue.zip/awsglue/context.py", line 198, in write_from_options
    File "/mnt/yarn/usercache/root/appcache/application_1513099821372_0007/container_1513099821372_0007_01_000001/PyGlue.zip/awsglue/data_sink.py", line 32, in write
    File "/mnt/yarn/usercache/root/appcache/application_1513099821372_0007/container_1513099821372_0007_01_000001/PyGlue.zip/awsglue/data_sink.py", line 28, in writeFrame
    File "/mnt/yarn/usercache/root/appcache/application_1513099821372_0007/container_1513099821372_0007_01_000001/py4j-0.10.4-src.zip/py4j/java_gateway.py", line 1133, in __call__
    File "/mnt/yarn/usercache/root/appcache/application_1513099821372_0007/container_1513099821372_0007_01_000001/pyspark.zip/pyspark/sql/utils.py", line 63, in deco
    File "/mnt/yarn/usercache/root/appcache/application_1513099821372_0007/container_1513099821372_0007_01_000001/py4j-0.10.4-src.zip/py4j/protocol.py", line 319, in get_return_value
    py4j.protocol.Py4JJavaError: An error occurred while calling o86.pyWriteDynamicFrame.
    : org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 0.0 failed 4 times, most recent failure: Lost task 0.3 in stage 0.0 (TID 3, ip-172-31-63-141.ec2.internal, executor 1): java.io.IOException: File already exists:s3://primero-viz/output/tcw_entries/run-1513105898742-part-r-00000
    at com.amazon.ws.emr.hadoop.fs.s3n.S3NativeFileSystem.create(S3NativeFileSystem.java:604)
    at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:915)
    at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:896)
    at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:793)
    at com.amazon.ws.emr.hadoop.fs.EmrFileSystem.create(EmrFileSystem.java:176)
    at com.amazonaws.services.glue.hadoop.TapeOutputFormat.getRecordWriter(TapeOutputFormat.scala:65)
    at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsNewAPIHadoopDataset$1$$anonfun$12.apply(PairRDDFunctions.scala:1119)
    at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsNewAPIHadoopDataset$1$$anonfun$12.apply(PairRDDFunctions.scala:1102)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
    at org.apache.spark.scheduler.Task.run(Task.scala:99)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:282)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)

    Driver stacktrace:
    at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1435)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1423)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1422)
    at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
    at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1422)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:802)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:802)
    at scala.Option.foreach(Option.scala:257)
    at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:802)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1650)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1605)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1594)
    at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
    at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:628)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:1918)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:1931)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:1951)
    at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsNewAPIHadoopDataset$1.apply$mcV$sp(PairRDDFunctions.scala:1158)
    at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsNewAPIHadoopDataset$1.apply(PairRDDFunctions.scala:1085)
    at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsNewAPIHadoopDataset$1.apply(PairRDDFunctions.scala:1085)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
    at org.apache.spark.rdd.RDD.withScope(RDD.scala:362)
    at org.apache.spark.rdd.PairRDDFunctions.saveAsNewAPIHadoopDataset(PairRDDFunctions.scala:1085)
    at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsNewAPIHadoopFile$2.apply$mcV$sp(PairRDDFunctions.scala:1005)
    at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsNewAPIHadoopFile$2.apply(PairRDDFunctions.scala:996)
    at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsNewAPIHadoopFile$2.apply(PairRDDFunctions.scala:996)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
    at org.apache.spark.rdd.RDD.withScope(RDD.scala:362)
    at org.apache.spark.rdd.PairRDDFunctions.saveAsNewAPIHadoopFile(PairRDDFunctions.scala:996)
    at com.amazonaws.services.glue.HadoopDataSink$$anonfun$2.apply$mcV$sp(DataSink.scala:192)
    at com.amazonaws.services.glue.HadoopDataSink.writeDynamicFrame(DataSink.scala:202)
    at com.amazonaws.services.glue.DataSink.pyWriteDynamicFrame(DataSink.scala:48)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
    at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
    at py4j.Gateway.invoke(Gateway.java:280)
    at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
    at py4j.commands.CallCommand.execute(CallCommand.java:79)
    at py4j.GatewayConnection.run(GatewayConnection.java:214)
    at java.lang.Thread.run(Thread.java:748)
    Caused by: java.io.IOException: File already exists:s3://primero-viz/output/tcw_entries/run-1513105898742-part-r-00000
2

2 Answers

0
votes

The target folder is empty

Empty is not the same as not exist. It doesn't look like write_dynamic_frame supports write modes so might have to drop the directory first.

0
votes

Setup the write mode to "append" whether your load is incremental or "overwrite" if it's full load.

One example could be:

events.toDF().write.json(events_dir, mode="append", partitionBy=["partition_0", "partition_1"])