4
votes

Please tell me to how to following problem.

Firstly,I confirmed that following code run when master is "local".

Then I started two EC2 instances(m1.large). However,when master is "spark://MASTER_PUBLIC_DNS:7077",the error message,"TaskSchedulerImpl", appears and it fails.

When I change to INVALID address as a master(spark://INVALID_DNS:7077) from VALID address,the same error message appears.

Namely,"WARN TaskSchedulerImpl: Initial job has not accepted any resources; check your cluster UI to ensure that workers are registered and have sufficient memory"

It seems like this. As this comment, I assigned 12G memory to this cluster but it fails.

#!/usr/bin/env python                                                                                     
# -*- coding: utf-8 -*- 
from pyspark import SparkContext, SparkConf 
from pyspark.mllib.classification import LogisticRegressionWithSGD 
from pyspark.mllib.regression import LabeledPoint 
from numpy import array 

# Load and parse the data 
def parsePoint(line): 
  values = [float(x) for x in line.split(' ')] 
  return LabeledPoint(values[0], values[1:]) 
appName = "testsparkapp" 
master = "spark://MASTER_PUBLIC_DNS:7077" 
#master = "local" 


conf = SparkConf().setAppName(appName).setMaster(master) 
sc = SparkContext(conf=conf) 

data = sc.textFile("/root/spark/mllib/data/sample_svm_data.txt") 
parsedData = data.map(parsePoint) 

# Build the model 
model = LogisticRegressionWithSGD.train(parsedData) 

# Evaluating the model on training data 
labelsAndPreds = parsedData.map(lambda p: (p.label, model.predict(p.features))) 
trainErr = labelsAndPreds.filter(lambda (v, p): v != p).count() / float(parsedData.count()) 
print("Training Error = " + str(trainErr))     

Additional

I did three tasks that my friend adviced me to.

1.I opened master port,7077.

2.In master url, set host name not ip address.

->There fore,I became to able to connect master server(I checked it by Cluster UI).

3.I tried to set worker_max_heap,as following, but it fails probably.

ScalaConf().set("spark.executor.memory", "4g").set("worker_max_heapsize","2g")

the worker allow me to use 6.3GB(I checked it by UI).It is m1.large.

->I recognized a warning in my executing log and an error in a worker stderr.

my executing log

14/08/08 06:11:59 WARN TaskSchedulerImpl: Initial job has not accepted any resources; check your cluster UI to ensure that workers are registered and have sufficient memory

worker stderr

14/08/08 06:14:04 INFO worker.WorkerWatcher: Successfully connected to akka.tcp://sparkWorker@PRIVATE_HOST_NAME1:52011/user/Worker
14/08/08 06:15:07 ERROR executor.CoarseGrainedExecutorBackend: Driver Disassociated [akka.tcp://sparkExecutor@PRIVATE_HOST_NAME1:52201] -> [akka.tcp://spark@PRIVATE_HOST_NAME2:38286] disassociated! Shutting down.
1

1 Answers

5
votes

The spark-ec2 script configure the Spark Cluster in EC2 as standalone, which mean it can not work with remote submits. I've been struggled with this same error you described for days before figure out it's not supported. The message error is unfortunately incorrect.

So you have to copy your stuff and log into the master to execute your spark task.