0
votes

I try to create a JavaRDD which contains an other series of RDD inside.

RDDMachine.foreach(machine -> startDetectionNow()) Inside, machine start a query to ES and get an other RDD. I collect all this (1200hits) and covert to Lists. After the Machine start work with this list

Firstly : is it possible to do this or not ? if not, in which way can i try to do something different?

Let me show what I try to do :

        SparkConf conf = new SparkConf().setAppName("Algo").setMaster("local");
    conf.set("es.index.auto.create", "true");
    conf.set("es.nodes", "IP_ES");
    conf.set("es.port", "9200");
    sparkContext = new JavaSparkContext(conf);

    MyAlgoConfig config_algo = new MyAlgoConfig(Detection.byPrevisionMerge);

    Machine m1 = new Machine("AL-27", "IP1", config_algo);
    Machine m2 = new Machine("AL-20", "IP2", config_algo);
    Machine m3 = new Machine("AL-24", "IP3", config_algo);
    Machine m4 = new Machine("AL-21", "IP4", config_algo);

    ArrayList<Machine> Machines = new ArrayList();
    Machines.add(m1);
    Machines.add(m2);
    Machines.add(m3);
    Machines.add(m4);

    JavaRDD<Machine> machineRDD = sparkContext.parallelize(Machines);

    machineRDD.foreach(machine -> machine.startDetectNow());

I try to start my algorithm in each machine which must learn from data located in Elasticsearch.


    public boolean startDetectNow()


    // MEGA Requete ELK
    JavaRDD dataForLearn = Elastic.loadElasticsearch(
            Algo.sparkContext
            , "logstash-*/Collector"
            , Elastic.req_AvgOfCall(
                    getIP()
                    , "hour"
                    , "2016-04-16T00:00:00"
                    , "2016-06-10T00:00:00"));

    JavaRDD<Hit> RDD_hits = Elastic.mapToHit(dataForLearn);
    List<Hit> hits = Elastic.RddToListHits(RDD_hits);

So I try to get all data of a query in every "Machine". My question is : is it possible to do this with Spark ? Or maybe in an other way ? When I start it in Spark; it's seams to be something like lock when the code is around the second RDD.

And the error message is :

16/08/17 00:17:13 INFO SparkContext: Starting job: collect at Elastic.java:94 16/08/17 00:17:13 INFO DAGScheduler: Got job 1 (collect at Elastic.java:94) with 1 output partitions 16/08/17 00:17:13 INFO DAGScheduler: Final stage: ResultStage 1 (collect at Elastic.java:94) 16/08/17 00:17:13 INFO DAGScheduler: Parents of final stage: List() 16/08/17 00:17:13 INFO DAGScheduler: Missing parents: List() 16/08/17 00:17:13 INFO DAGScheduler: Submitting ResultStage 1 (MapPartitionsRDD[4] at map at Elastic.java:106), which has no missing parents 16/08/17 00:17:13 INFO MemoryStore: Block broadcast_1 stored as values in memory (estimated size 4.3 KB, free 7.7 KB) 16/08/17 00:17:13 INFO MemoryStore: Block broadcast_1_piece0 stored as bytes in memory (estimated size 2.5 KB, free 10.2 KB) 16/08/17 00:17:13 INFO BlockManagerInfo: Added broadcast_1_piece0 in memory on localhost:46356 (size: 2.5 KB, free: 511.1 MB) 16/08/17 00:17:13 INFO SparkContext: Created broadcast 1 from broadcast at DAGScheduler.scala:1006 16/08/17 00:17:13 INFO DAGScheduler: Submitting 1 missing tasks from ResultStage 1 (MapPartitionsRDD[4] at map at Elastic.java:106) 16/08/17 00:17:13 INFO TaskSchedulerImpl: Adding task set 1.0 with 1 tasks ^C16/08/17 00:17:22 INFO SparkContext: Invoking stop() from shutdown hook 16/08/17 00:17:22 INFO SparkUI: Stopped Spark web UI at http://192.168.10.23:4040 16/08/17 00:17:22 INFO DAGScheduler: ResultStage 0 (foreach at GuardConnect.java:60) failed in 10,292 s 16/08/17 00:17:22 INFO DAGScheduler: Job 0 failed: foreach at GuardConnect.java:60, took 10,470974 s Exception in thread "main" org.apache.spark.SparkException: Job 0 cancelled because SparkContext was shut down at org.apache.spark.scheduler.DAGScheduler$$anonfun$cleanUpAfterSchedulerStop$1.apply(DAGScheduler.scala:806) at org.apache.spark.scheduler.DAGScheduler$$anonfun$cleanUpAfterSchedulerStop$1.apply(DAGScheduler.scala:804) at scala.collection.mutable.HashSet.foreach(HashSet.scala:79) at org.apache.spark.scheduler.DAGScheduler.cleanUpAfterSchedulerStop(DAGScheduler.scala:804) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onStop(DAGScheduler.scala:1658) at org.apache.spark.util.EventLoop.stop(EventLoop.scala:84) at org.apache.spark.scheduler.DAGScheduler.stop(DAGScheduler.scala:1581) at org.apache.spark.SparkContext$$anonfun$stop$9.apply$mcV$sp(SparkContext.scala:1740) at org.apache.spark.util.Utils$.tryLogNonFatalError(Utils.scala:1229) at org.apache.spark.SparkContext.stop(SparkContext.scala:1739) at org.apache.spark.SparkContext$$anonfun$3.apply$mcV$sp(SparkContext.scala:596) at org.apache.spark.util.SparkShutdownHook.run(ShutdownHookManager.scala:267) at org.apache.spark.util.SparkShutdownHookManager$$anonfun$runAll$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(ShutdownHookManager.scala:239) at org.apache.spark.util.SparkShutdownHookManager$$anonfun$runAll$1$$anonfun$apply$mcV$sp$1.apply(ShutdownHookManager.scala:239) at org.apache.spark.util.SparkShutdownHookManager$$anonfun$runAll$1$$anonfun$apply$mcV$sp$1.apply(ShutdownHookManager.scala:239) at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1765) at org.apache.spark.util.SparkShutdownHookManager$$anonfun$runAll$1.apply$mcV$sp(ShutdownHookManager.scala:239) at org.apache.spark.util.SparkShutdownHookManager$$anonfun$runAll$1.apply(ShutdownHookManager.scala:239) at org.apache.spark.util.SparkShutdownHookManager$$anonfun$runAll$1.apply(ShutdownHookManager.scala:239) at scala.util.Try$.apply(Try.scala:161) at org.apache.spark.util.SparkShutdownHookManager.runAll(ShutdownHookManager.scala:239) at org.apache.spark.util.SparkShutdownHookManager$$anon$2.run(ShutdownHookManager.scala:218) at org.apache.hadoop.util.ShutdownHookManager$1.run(ShutdownHookManager.java:54) at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:620) at org.apache.spark.SparkContext.runJob(SparkContext.scala:1832) at org.apache.spark.SparkContext.runJob(SparkContext.scala:1845) at org.apache.spark.SparkContext.runJob(SparkContext.scala:1858) at org.apache.spark.SparkContext.runJob(SparkContext.scala:1929) at org.apache.spark.rdd.RDD$$anonfun$foreach$1.apply(RDD.scala:912) at org.apache.spark.rdd.RDD$$anonfun$foreach$1.apply(RDD.scala:910) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:111) at org.apache.spark.rdd.RDD.withScope(RDD.scala:316) at org.apache.spark.rdd.RDD.foreach(RDD.scala:910) at org.apache.spark.api.java.JavaRDDLike$class.foreach(JavaRDDLike.scala:332) at org.apache.spark.api.java.AbstractJavaRDDLike.foreach(JavaRDDLike.scala:46) at com.seigneurin.spark.GuardConnect.main(GuardConnect.java:60) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:731) at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:181) at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:206) at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:121) at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) 16/08/17 00:17:22 ERROR LiveListenerBus: SparkListenerBus has already stopped! Dropping event SparkListenerStageCompleted(org.apache.spark.scheduler.StageInfo@4a7e0846) 16/08/17 00:17:22 INFO DAGScheduler: ResultStage 1 (collect at Elastic.java:94) failed in 9,301 s 16/08/17 00:17:22 ERROR LiveListenerBus: SparkListenerBus has already stopped! Dropping event SparkListenerStageCompleted(org.apache.spark.scheduler.StageInfo@6c6b4cb8) 16/08/17 00:17:22 ERROR LiveListenerBus: SparkListenerBus has already stopped! Dropping event SparkListenerJobEnd(0,1471385842813,JobFailed(org.apache.spark.SparkException: Job 0 cancelled because SparkContext was shut down)) 16/08/17 00:17:22 INFO DAGScheduler: Job 1 failed: collect at Elastic.java:94, took 9,317650 s 16/08/17 00:17:22 ERROR Executor: Exception in task 0.0 in stage 0.0 (TID 0) org.apache.spark.SparkException: Job 1 cancelled because SparkContext was shut down at org.apache.spark.scheduler.DAGScheduler$$anonfun$cleanUpAfterSchedulerStop$1.apply(DAGScheduler.scala:806) at org.apache.spark.scheduler.DAGScheduler$$anonfun$cleanUpAfterSchedulerStop$1.apply(DAGScheduler.scala:804) at scala.collection.mutable.HashSet.foreach(HashSet.scala:79) at org.apache.spark.scheduler.DAGScheduler.cleanUpAfterSchedulerStop(DAGScheduler.scala:804) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onStop(DAGScheduler.scala:1658) at org.apache.spark.util.EventLoop.stop(EventLoop.scala:84) at org.apache.spark.scheduler.DAGScheduler.stop(DAGScheduler.scala:1581) at org.apache.spark.SparkContext$$anonfun$stop$9.apply$mcV$sp(SparkContext.scala:1740) at org.apache.spark.util.Utils$.tryLogNonFatalError(Utils.scala:1229) at org.apache.spark.SparkContext.stop(SparkContext.scala:1739) at org.apache.spark.SparkContext$$anonfun$3.apply$mcV$sp(SparkContext.scala:596) at org.apache.spark.util.SparkShutdownHook.run(ShutdownHookManager.scala:267) at org.apache.spark.util.SparkShutdownHookManager$$anonfun$runAll$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(ShutdownHookManager.scala:239) at org.apache.spark.util.SparkShutdownHookManager$$anonfun$runAll$1$$anonfun$apply$mcV$sp$1.apply(ShutdownHookManager.scala:239) at org.apache.spark.util.SparkShutdownHookManager$$anonfun$runAll$1$$anonfun$apply$mcV$sp$1.apply(ShutdownHookManager.scala:239) at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1765) at org.apache.spark.util.SparkShutdownHookManager$$anonfun$runAll$1.apply$mcV$sp(ShutdownHookManager.scala:239) at org.apache.spark.util.SparkShutdownHookManager$$anonfun$runAll$1.apply(ShutdownHookManager.scala:239) at org.apache.spark.util.SparkShutdownHookManager$$anonfun$runAll$1.apply(ShutdownHookManager.scala:239) at scala.util.Try$.apply(Try.scala:161) at org.apache.spark.util.SparkShutdownHookManager.runAll(ShutdownHookManager.scala:239) at org.apache.spark.util.SparkShutdownHookManager$$anon$2.run(ShutdownHookManager.scala:218) at org.apache.hadoop.util.ShutdownHookManager$1.run(ShutdownHookManager.java:54) at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:620) at org.apache.spark.SparkContext.runJob(SparkContext.scala:1832) at org.apache.spark.SparkContext.runJob(SparkContext.scala:1845) at org.apache.spark.SparkContext.runJob(SparkContext.scala:1858) at org.apache.spark.SparkContext.runJob(SparkContext.scala:1929) at org.apache.spark.rdd.RDD$$anonfun$collect$1.apply(RDD.scala:927) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:111) at org.apache.spark.rdd.RDD.withScope(RDD.scala:316) at org.apache.spark.rdd.RDD.collect(RDD.scala:926) at org.apache.spark.api.java.JavaRDDLike$class.collect(JavaRDDLike.scala:339) at org.apache.spark.api.java.AbstractJavaRDDLike.collect(JavaRDDLike.scala:46) at com.seigneurin.spark.Elastic.RddToListHits(Elastic.java:94) at com.seigneurin.spark.OXO.prepareDataAndLearn(OXO.java:126) at com.seigneurin.spark.OXO.startDetectNow(OXO.java:148) at com.seigneurin.spark.GuardConnect.lambda$main$1282d8df$1(GuardConnect.java:60) at org.apache.spark.api.java.JavaRDDLike$$anonfun$foreach$1.apply(JavaRDDLike.scala:332) at org.apache.spark.api.java.JavaRDDLike$$anonfun$foreach$1.apply(JavaRDDLike.scala:332) at scala.collection.Iterator$class.foreach(Iterator.scala:727) at org.apache.spark.InterruptibleIterator.foreach(InterruptibleIterator.scala:28) at org.apache.spark.rdd.RDD$$anonfun$foreach$1$$anonfun$apply$32.apply(RDD.scala:912) at org.apache.spark.rdd.RDD$$anonfun$foreach$1$$anonfun$apply$32.apply(RDD.scala:912) at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1858) at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1858) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66) at org.apache.spark.scheduler.Task.run(Task.scala:89) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) 16/08/17 00:17:22 ERROR LiveListenerBus: SparkListenerBus has already stopped! Dropping event SparkListenerJobEnd(1,1471385842814,JobFailed(org.apache.spark.SparkException: Job 1 cancelled because SparkContext was shut down)) 16/08/17 00:17:22 INFO MapOutputTrackerMasterEndpoint: MapOutputTrackerMasterEndpoint stopped! 16/08/17 00:17:22 INFO MemoryStore: MemoryStore cleared 16/08/17 00:17:22 INFO BlockManager: BlockManager stopped 16/08/17 00:17:22 INFO BlockManagerMaster: BlockManagerMaster stopped 16/08/17 00:17:22 INFO OutputCommitCoordinator$OutputCommitCoordinatorEndpoint: OutputCommitCoordinator stopped! 16/08/17 00:17:22 INFO RemoteActorRefProvider$RemotingTerminator: Shutting down remote daemon. 16/08/17 00:17:22 INFO RemoteActorRefProvider$RemotingTerminator: Remote daemon shut down; proceeding with flushing remote transports. 16/08/17 00:17:22 INFO TaskSetManager: Starting task 0.0 in stage 1.0 (TID 1, localhost, partition 0,ANY, 6751 bytes) 16/08/17 00:17:22 ERROR Inbox: Ignoring error java.util.concurrent.RejectedExecutionException: Task org.apache.spark.executor.Executor$TaskRunner@65fd4104 rejected from java.util.concurrent.ThreadPoolExecutor@4387a1bf[Terminated, pool size = 0, active threads = 0, queued tasks = 0, completed tasks = 1] at java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2047) at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:823) at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1369) at org.apache.spark.executor.Executor.launchTask(Executor.scala:128) at org.apache.spark.scheduler.local.LocalEndpoint$$anonfun$reviveOffers$1.apply(LocalBackend.scala:86) at org.apache.spark.scheduler.local.LocalEndpoint$$anonfun$reviveOffers$1.apply(LocalBackend.scala:84) at scala.collection.immutable.List.foreach(List.scala:318) at org.apache.spark.scheduler.local.LocalEndpoint.reviveOffers(LocalBackend.scala:84) at org.apache.spark.scheduler.local.LocalEndpoint$$anonfun$receive$1.applyOrElse(LocalBackend.scala:69) at org.apache.spark.rpc.netty.Inbox$$anonfun$process$1.apply$mcV$sp(Inbox.scala:116) at org.apache.spark.rpc.netty.Inbox.safelyCall(Inbox.scala:204) at org.apache.spark.rpc.netty.Inbox.process(Inbox.scala:100) at org.apache.spark.rpc.netty.Dispatcher$MessageLoop.run(Dispatcher.scala:215) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) 16/08/17 00:17:22 INFO SparkContext: Successfully stopped SparkContext 16/08/17 00:17:22 INFO ShutdownHookManager: Shutdown hook called 16/08/17 00:17:22 INFO ShutdownHookManager: Deleting directory /tmp/spark-8bf65e78-a916-4cc0-b4d1-1b0ec9a07157 16/08/17 00:17:22 INFO RemoteActorRefProvider$RemotingTerminator: Remoting shut down. 16/08/17 00:17:22 INFO ShutdownHookManager: Deleting directory /tmp/spark-8bf65e78-a916-4cc0-b4d1-1b0ec9a07157/httpd-6d3aeb80-808c-4749-8f8b-ac9341f990a7

Thank if you can give me some advice.

1
We need the inner exception to help. All this tells is that there's a problem with your foreach.Yuval Itzchakov
hum, maybe because I have a while(1) just after the Rdd<Hit>? I was thinking that I can maybe threading the work with RDD. I add the full message of errorVincent Vost
a RDD of RDDs does not really make sense, but yes, there is a way to trick the compiler to compile.GameOfThrows

1 Answers

0
votes

You cannot create an RDD inside an RDD, what soever the type of RDD be. This is the first rule. This is because RDD being an abstraction pointing to your data.