2
votes

I read a text file using spark and saved it in JavaRDD ,and trying to print the data saved in RDD.I am running my code in a cluster with a master and two slaves.But I am getting exception ,such as, containers exceeds thresholds ,while iterating through the RDD. The code was running perfectly in standalone mode.

My Code:

SparkContext sc = new SparkContext("spark://master.com:7077","Spark-Phoenix");
JavaSparkContext jsc = new JavaSparkContext(sc);

JavaRDD<String> trs_testing = jsc.textFile("hdfs://master.com:9000/Table/sample");


//using iterator
Iterator<String> iStr= trs_testing.toLocalIterator();

while(iStr.hasNext()){ //here I am getting exception    
    System.out.println("itr next : " + iStr.next());        
}   

//using foreach()
trs_testing.foreach(new VoidFunction<String>() {//here I am getting exception
        private static final long serialVersionUID = 1L;

        @Override public void call(String line) throws Exception {
            System.out.println(line);       
        }           
});

//using collect()
for(String line:trs_testing.collect()){//here I am getting exception 
    System.out.println(line);
}

//using foreachPartition()
trs_testing.foreachPartition(new VoidFunction<Iterator<String>>() {//here I am getting exception 
        private static final long serialVersionUID = 1L;

        @Override public void call(Iterator<String> arg0) throws Exception {          
            while (arg0.hasNext()) {            
                String line = arg0.next();
                System.out.println(line);
            }
    }
});

Exception:

ERROR TaskSchedulerImpl Lost executor 0 on master.com: Remote RPC client disassociated. Likely due to containers exceeding thresholds, or network issues. Check driver logs for WARN messages. ERROR TaskSchedulerImpl Lost executor 1 on slave1.com: Remote RPC client disassociated. Likely due to containers exceeding thresholds, or network issues. Check driver logs for WARN messages. ERROR TaskSchedulerImpl Lost executor 2 on master.com: Remote RPC client disassociated. Likely due to containers exceeding thresholds, or network issues. Check driver logs for WARN messages. ERROR TaskSchedulerImpl Lost executor 3 on slave2.com: Remote RPC client disassociated. Likely due to containers exceeding thresholds, or network issues. Check driver logs for WARN messages. ERROR TaskSetManager Task 0 in stage 0.0 failed 4 times; aborting job Exception in thread "main" org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 0.0 failed 4 times, most recent failure: Lost task 0.3 in stage 0.0 (TID 3, slave1.com): ExecutorLostFailure (executor 3 exited caused by one of the running tasks) Reason: Remote RPC client disassociated. Likely due to containers exceeding thresholds, or network issues. Check driver logs for WARN messages. Driver stacktrace: at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1454) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1442) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1441) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48) at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1441) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:811) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:811) at scala.Option.foreach(Option.scala:257) at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:811) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1667) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1622) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1611) at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48) at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:632) at org.apache.spark.SparkContext.runJob(SparkContext.scala:1890) at org.apache.spark.SparkContext.runJob(SparkContext.scala:1903) at org.apache.spark.SparkContext.runJob(SparkContext.scala:1916) at org.apache.spark.rdd.RDD$$anonfun$take$1.apply(RDD.scala:1324) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112) at org.apache.spark.rdd.RDD.withScope(RDD.scala:358) at org.apache.spark.rdd.RDD.take(RDD.scala:1298) at org.apache.spark.rdd.RDD$$anonfun$first$1.apply(RDD.scala:1338) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112) at org.apache.spark.rdd.RDD.withScope(RDD.scala:358) at org.apache.spark.rdd.RDD.first(RDD.scala:1337) at com.test.java.InsertE.main(InsertE.java:147)

2

2 Answers

0
votes

When you are running the Spark Job in local system/standalone mode, all the data will be in the same machine, hence you'll be able to iterate and print the data.

When the Spark Job is run on cluster mode/environment, the data will be split in to fragments and distributed to all the machines in the cluster(RDD - Resilient Distributed Datasets). Hence, to print in this manner, you'll have to use, a foreach() function.

Try this:

trs_testing.foreach(new VoidFunction<String>(){ 
          public void call(String line) {
              System.out.println(line);
          }
});
0
votes

I got the solution.I was executing the code through my machine whereas my master and slaves were running on a remote server.I exported my code to the remote server and was finally able to process the data.