2
votes

I have a Spark structured streaming job which died silently without explicit error messages in the application logs. It ran well for about 10 hours, and then started to have some non-fatal error messages. It continued to produce results for about a day, then the driver container died silently.

The job is running in a 3-node HDP platform based cluster, managed in yarn cluster mode. It ingests data from Kafka, does some computation, then sends output to Kafka and HDFS.

First I looked at the yarn application logs for the driver container, and found these error messages:

19/05/19 21:02:08 ERROR AsyncEventQueue: Listener EventLoggingListener threw an exception
java.io.IOException: Failed to replace a bad datanode on the existing pipeline due to no more good datanodes being available to try. (Nodes: curr
ent=[DatanodeInfoWithStorage[10.8.0.247:50010,DS-6502520b-5b78-408b-b18d-a99df4fb76ab,DISK], DatanodeInfoWithStorage[10.8.0.145:50010,DS-d8133dc8
-cfaa-406d-845d-c819186c1450,DISK]], original=[DatanodeInfoWithStorage[10.8.0.247:50010,DS-6502520b-5b78-408b-b18d-a99df4fb76ab,DISK], DatanodeIn
foWithStorage[10.8.0.145:50010,DS-d8133dc8-cfaa-406d-845d-c819186c1450,DISK]]). The current failed datanode replacement policy is DEFAULT, and a
client may configure this via 'dfs.client.block.write.replace-datanode-on-failure.policy' in its configuration.
        at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.findNewDatanode(DFSOutputStream.java:1059)
        at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.addDatanode2ExistingPipeline(DFSOutputStream.java:1122)
        at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.setupPipelineForAppendOrRecovery(DFSOutputStream.java:1280)
        at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.processDatanodeError(DFSOutputStream.java:1005)
        at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.run(DFSOutputStream.java:512)

End of LogType:stderr
***********************************************************************

The above is the last message of the driver.

It looks horrible, but the job was producing results with 36,628 such errors in a day, so it was not causing the job to die directly. The HDFS system does also appear to be working.

Then I looked at the executor logs. They exited after driver died and don't contain any error or exception:

19/05/19 21:02:09 ERROR CoarseGrainedExecutorBackend: Executor self-exiting due to : Driver ip-10-8-0-247.us-west-2.compute.internal:11269 disass
ociated! Shutting down.

I couldn't figure out the cause, so I looked at the yarn resource manager log, and found these messages:

2019-05-19 18:36:44,047 INFO  availability.MetricSinkWriteShardHostnameHashingStrategy (MetricSinkWriteShardHostnameHashingStrategy.java:findColl
ectorShard(42)) - Calculated collector shard ip-10-8-0-145.us-west-2.compute.internal based on hostname: ip-10-8-0-145.us-west-2.compute.internal
2019-05-19 19:48:04,041 INFO  availability.MetricSinkWriteShardHostnameHashingStrategy (MetricSinkWriteShardHostnameHashingStrategy.java:findColl
ectorShard(42)) - Calculated collector shard ip-10-8-0-145.us-west-2.compute.internal based on hostname: ip-10-8-0-145.us-west-2.compute.internal
2019-05-19 21:02:08,797 INFO  rmcontainer.RMContainerImpl (RMContainerImpl.java:handle(422)) - container_e01_1557249464624_0669_01_000001 Contain
er Transitioned from RUNNING to COMPLETED
2019-05-19 21:02:08,797 INFO  scheduler.SchedulerNode (SchedulerNode.java:releaseContainer(220)) - Released container container_e01_1557249464624
_0669_01_000001 of capacity <memory:1024, vCores:1> on host ip-10-8-0-247.us-west-2.compute.internal:45454, which currently has 7 containers, <me
mory:19968, vCores:7> used and <memory:2560, vCores:1> available, release resources=true
2019-05-19 21:02:08,798 INFO  attempt.RMAppAttemptImpl (RMAppAttemptImpl.java:rememberTargetTransitionsAndStoreState(1209)) - Updating applicatio
n attempt appattempt_1557249464624_0669_000001 with final state: FAILED, and exit status: -104
2019-05-19 21:02:08,798 INFO  attempt.RMAppAttemptImpl (RMAppAttemptImpl.java:handle(809)) - appattempt_1557249464624_0669_000001 State change fr
om RUNNING to FINAL_SAVING
2019-05-19 21:02:08,798 INFO  integration.RMRegistryOperationsService (RMRegistryOperationsService.java:onContainerFinished(143)) - Container con
tainer_e01_1557249464624_0669_01_000001 finished, skipping purging container-level records (should be handled by AM)
2019-05-19 21:02:08,801 INFO  resourcemanager.ApplicationMasterService (ApplicationMasterService.java:unregisterAttempt(685)) - Unregistering app
 attempt : appattempt_1557249464624_0669_000001
2019-05-19 21:02:08,801 INFO  security.AMRMTokenSecretManager (AMRMTokenSecretManager.java:applicationMasterFinished(124)) - Application finished
, removing password for appattempt_1557249464624_0669_000001
2019-05-19 21:02:08,801 INFO  attempt.RMAppAttemptImpl (RMAppAttemptImpl.java:handle(809)) - appattempt_1557249464624_0669_000001 State change fr
om FINAL_SAVING to FAILED
2019-05-19 21:02:08,801 INFO  rmapp.RMAppImpl (RMAppImpl.java:transition(1331)) - The number of failed attempts is 1. The max attempts is 2
2019-05-19 21:02:08,801 INFO  rmapp.RMAppImpl (RMAppImpl.java:handle(779)) - application_1557249464624_0669 State change from RUNNING to ACCEPTED
2019-05-19 21:02:08,801 INFO  capacity.CapacityScheduler (CapacityScheduler.java:doneApplicationAttempt(812)) - Application Attempt appattempt_15
57249464624_0669_000001 is done. finalState=FAILED

Looks like yarn also didn't kill the job. The driver container suddenly turned from RUNNING to COMPLETED.

I expect to see some explicit message like OOM causing the job crash, but now I'm confused as to why it exited silently. Is there any relationship with the HDFS error? Is there any mechanism in Spark to silently stop the driver when there are too many exceptions (even if they are not fatal)? Any advice is welcome, thanks!

2

2 Answers

0
votes

Yarn exit code -104 means that the physical memory limits for that Yarn container were exceeded.

Container terminated because of exceeding allocated physical memory limit.

As you're running on AWS, you could use a higher RAM instance type for your driver node.

0
votes

Please check below link for details-

Ref: Bad DataNode Failure Issue Hortonworks-

Cause:- This issue occurs when we are running jobs on small clusters (clusters with less than 5 datanodes) and with heavy data loads. If there is a datanode/network failure in the write pipeline, DFSClient will try to remove the failed datanode from the pipeline and then continue writing with the remaining datanodes. As a result, the number of datanodes in the pipeline is decreased. Below explained properties help us with the issue.

Solution:- Please change DataNode replacement policy as below-

To resolve this issue, set the following two properties from Ambari > HDFS > Configs > Custom HDFS site > Add Property:

dfs.client.block.write.replace-datanode-on-failure.enable=NEVER
dfs.client.block.write.replace-datanode-on-failure.policy=NEVER