1
votes

On every slave node through Marathon we run Mesos External Shuffle Service . When we submit spark job via dcos CLI in coarse grained mode without dynamic allocation everything working as expected. But when we submit the same job with dynamic allocation it fails.

16/12/08 19:20:42 ERROR OneForOneBlockFetcher: Failed while starting block fetches
java.lang.RuntimeException: java.lang.RuntimeException: Failed to open file:/tmp/blockmgr-d4df5df4-24c9-41a3-9f26-4c1aba096814/30/shuffle_0_0_0.index
at   org.apache.spark.network.shuffle.ExternalShuffleBlockResolver.getSortBasedShuffleBlockData(ExternalShuffleBlockResolver.java:234)
...
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354)
...
Caused by: java.io.FileNotFoundException: /tmp/blockmgr-d4df5df4-24c9-41a3-9f26-4c1aba096814/30/shuffle_0_0_0.index (No such file or directory)

Full description:

  • We installed Mesos (DCOS) with Marathon using Azure Portal.
  • Via Universe Packages we installed: Cassandra, Spark and Marathon-lb
  • We generated test data in Cassandra.
  • On laptop I installed dcos CLI

When I submit job as below everything is working as expected:

./dcos spark run --submit-args="--properties-file coarse-grained.conf --class portal.spark.cassandra.app.ProductModelPerNrOfAlerts http://marathon-lb-default.marathon.mesos:10018/jars/spark-cassandra-assembly-1.0.jar"
Run job succeeded. Submission id: driver-20161208185927-0043

success

cqlsh:sp> select count(*) from product_model_per_alerts_by_date ;

count
-------
476

coarse-grained.conf:

spark.cassandra.connection.host 10.32.0.17
spark.serializer org.apache.spark.serializer.KryoSerializer
spark.executor.cores 1
spark.executor.memory 1g
spark.executor.instances 2
spark.submit.deployMode cluster
spark.cores.max 4

portal.spark.cassandra.app.ProductModelPerNrOfAlerts:

package portal.spark.cassandra.app

import org.apache.spark.sql.{SQLContext, SaveMode}
import org.apache.spark.{SparkConf, SparkContext}

object ProductModelPerNrOfAlerts {
  def main(args: Array[String]): Unit = {

     val conf = new SparkConf(true)
                    .setAppName("cassandraSpark-ProductModelPerNrOfAlerts")

     val sc = new SparkContext(conf)

     val sqlContext = new SQLContext(sc)

     import sqlContext.implicits._

     val df = sqlContext
             .read
             .format("org.apache.spark.sql.cassandra")
             .options(Map("table" -> "asset_history", "keyspace" -> "sp"))
            .load()
            .select("datestamp","product_model","nr_of_alerts")

     val dr = df
           .groupBy("datestamp","product_model")
           .avg("nr_of_alerts")
           .toDF("datestamp","product_model","nr_of_alerts")

     dr.write
          .mode(SaveMode.Overwrite)
          .format("org.apache.spark.sql.cassandra")
          .options(Map("table" -> "product_model_per_alerts_by_date", "keyspace" -> "sp"))
          .save()


     sc.stop()
 }
}

Dynamic Allocation

Through Marathon we run Mesos External Shuffle Service:

{
  "id": "spark-mesos-external-shuffle-service-tt",
  "container": {
     "type": "DOCKER",
     "docker": {
        "image": "jpavt/mesos-spark-hadoop:mesos-external-shuffle-service-1.0.4-2.0.1",
        "network": "BRIDGE",
        "portMappings": [
           { "hostPort": 7337, "containerPort": 7337, "servicePort": 7337 }
         ],
       "forcePullImage":true,
       "volumes": [
         {
           "containerPath": "/tmp",
           "hostPath": "/tmp",
           "mode": "RW"
         }
       ]
     }
   },
   "instances": 9,
   "cpus": 0.2,
   "mem": 512,
   "constraints": [["hostname", "UNIQUE"]]
 }

Dockerfile for jpavt/mesos-spark-hadoop:mesos-external-shuffle-service-1.0.4-2.0.1:

FROM mesosphere/spark:1.0.4-2.0.1
WORKDIR /opt/spark/dist
ENTRYPOINT ["./bin/spark-class", "org.apache.spark.deploy.mesos.MesosExternalShuffleService"]

Now when I submit job with dynamic allocation it fails:

./dcos spark run --submit-args="--properties-file dynamic-allocation.conf --class portal.spark.cassandra.app.ProductModelPerNrOfAlerts http://marathon-lb-default.marathon.mesos:10018/jars/spark-cassandra-assembly-1.0.jar"
 Run job succeeded. Submission id: driver-20161208191958-0047

failure

select count(*) from product_model_per_alerts_by_date ;

count
-------
 5

dynamic-allocation.conf:

spark.cassandra.connection.host 10.32.0.17
spark.serializer org.apache.spark.serializer.KryoSerializer
spark.executor.cores 1
spark.executor.memory 1g
spark.submit.deployMode cluster
spark.cores.max 4

spark.shuffle.service.enabled true
spark.dynamicAllocation.enabled true
spark.dynamicAllocation.minExecutors 2
spark.dynamicAllocation.maxExecutors 5
spark.dynamicAllocation.cachedExecutorIdleTimeout 120s
spark.dynamicAllocation.schedulerBacklogTimeout 10s
spark.dynamicAllocation.sustainedSchedulerBacklogTimeout 20s
spark.mesos.executor.docker.volumes /tmp:/tmp:rw
spark.local.dir /tmp

logs from mesos:

16/12/08 19:20:42 INFO MemoryStore: Block broadcast_7_piece0 stored as bytes in memory (estimated size 18.0 KB, free 366.0 MB)
16/12/08 19:20:42 INFO TorrentBroadcast: Reading broadcast variable 7 took 21 ms
16/12/08 19:20:42 INFO MemoryStore: Block broadcast_7 stored as values in memory (estimated size 38.6 KB, free 366.0 MB)
16/12/08 19:20:42 INFO MapOutputTrackerWorker: Don't have map outputs for shuffle 0, fetching them
16/12/08 19:20:42 INFO MapOutputTrackerWorker: Doing the fetch; tracker endpoint = NettyRpcEndpointRef(spark://[email protected]:45422)
16/12/08 19:20:42 INFO MapOutputTrackerWorker: Got the output locations
16/12/08 19:20:42 INFO ShuffleBlockFetcherIterator: Getting 4 non-empty blocks out of 58 blocks
16/12/08 19:20:42 INFO TransportClientFactory: Successfully created connection to /10.32.0.11:7337 after 2 ms (0 ms spent in bootstraps)
16/12/08 19:20:42 INFO ShuffleBlockFetcherIterator: Started 1 remote fetches in 13 ms
16/12/08 19:20:42 ERROR OneForOneBlockFetcher: Failed while starting block fetches java.lang.RuntimeException: java.lang.RuntimeException: Failed to open file: /tmp/blockmgr-d4df5df4-24c9-41a3-9f26-4c1aba096814/30/shuffle_0_0_0.index
at   org.apache.spark.network.shuffle.ExternalShuffleBlockResolver.getSortBasedShuffleBlockData(ExternalShuffleBlockResolver.java:234)
...
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354)
...
 Caused by: java.io.FileNotFoundException: /tmp/blockmgr-d4df5df4-24c9-41a3-9f26-4c1aba096814/30/shuffle_0_0_0.index (No such file or directory)

logs from marathon spark-mesos-external-shuffle-service-tt:

...
16/12/08 19:20:29 INFO MesosExternalShuffleBlockHandler: Received registration request from app 704aec43-1aa3-4971-bb98-e892beeb2c45-0008-driver-20161208191958-0047 (remote address /10.32.0.4:49710, heartbeat timeout 120000 ms).
16/12/08 19:20:31 INFO ExternalShuffleBlockResolver: Registered executor AppExecId{appId=704aec43-1aa3-4971-bb98-e892beeb2c45-0008-driver-20161208191958-0047, execId=2} with ExecutorShuffleInfo{localDirs=[/tmp/blockmgr-14525ef0-22e9-49fb-8e81-dc84e5fba8b2], subDirsPerLocalDir=64, shuffleManager=org.apache.spark.shuffle.sort.SortShuffleManager}
16/12/08 19:20:38 ERROR TransportRequestHandler: Error while invoking RpcHandler#receive() on RPC id 8157825166903585542
java.lang.RuntimeException: Failed to open file: /tmp/blockmgr-14525ef0-22e9-49fb-8e81-dc84e5fba8b2/16/shuffle_0_55_0.index
at org.apache.spark.network.shuffle.ExternalShuffleBlockResolver.getSortBasedShuffleBlockData(ExternalShuffleBlockResolver.java:234)
...
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354)
Caused by: java.io.FileNotFoundException: /tmp/blockmgr-14525ef0-22e9-49fb-8e81-dc84e5fba8b2/16/shuffle_0_55_0.index (No such file or directory)
...

but file exists on given slave box:

$ ls -l /tmp/blockmgr-14525ef0-22e9-49fb-8e81-dc84e5fba8b2/16/shuffle_0_55_0.index
-rw-r--r-- 1 root root 1608 Dec  8 19:20 /tmp/blockmgr-14525ef0-22e9-49fb-8e81-dc84e5fba8b2/16/shuffle_0_55_0.index


 stat shuffle_0_55_0.index 
  File: 'shuffle_0_55_0.index'
  Size: 1608        Blocks: 8          IO Block: 4096   regular file
  Device: 801h/2049d    Inode: 1805493     Links: 1
  Access: (0644/-rw-r--r--)  Uid: (    0/    root)   Gid: (    0/    root)
  Access: 2016-12-08 19:20:38.163188836 +0000
  Modify: 2016-12-08 19:20:38.163188836 +0000
  Change: 2016-12-08 19:20:38.163188836 +0000
  Birth: -
2

2 Answers

1
votes

I am not familiar with DCOS, Marathon and Azure though, I use dynamic resource allocation(Mesos external shuffle service) on Mesos and Aurora with Docker.

  • Each Mesos agent node has its own external shuffle service (that is, one external shuffle service for one mesos agent) ?
  • spark.local.dir setting is exactly same string and pointing same directory ? Your spark.local.dir for shuffle service is /tmp though, I don't know DCOS setting.
  • spark.local.dir directory can be readable/writable for both ? If both mesos agent and external shuffle service are launched by docker, spark.local.dir on host MUST be mounted to both containers.

EDIT

  • If SPARK_LOCAL_DIRS (mesos or standalone) environment variable is set, spark.local.dir will be overridden.
0
votes

There was error in marathon external shuffle service config instead of path container.docker.volumes we should use container.volumes path.

Correct configuration:

{
  "id": "mesos-external-shuffle-service-simple",
  "container": {
     "type": "DOCKER",
     "docker": {
        "image": "jpavt/mesos-spark-hadoop:mesos-external-shuffle-service-1.0.4-2.0.1",
        "network": "BRIDGE",
        "portMappings": [
           { "hostPort": 7337, "containerPort": 7337, "servicePort": 7337 }
         ],
       "forcePullImage":true
     },
    "volumes": [
         {
           "containerPath": "/tmp",
           "hostPath": "/tmp",
           "mode": "RW"
         }
    ]
   },
   "instances": 9,
   "cpus": 0.2,
   "mem": 512,
   "constraints": [["hostname", "UNIQUE"]]
 }