I try to create a JavaRDD which contains an other series of RDD inside.
RDDMachine.foreach(machine -> startDetectionNow()) Inside, machine start a query to ES and get an other RDD. I collect all this (1200hits) and covert to Lists. After the Machine start work with this list
Firstly : is it possible to do this or not ? if not, in which way can i try to do something different?
Let me show what I try to do :
SparkConf conf = new SparkConf().setAppName("Algo").setMaster("local");
conf.set("es.index.auto.create", "true");
conf.set("es.nodes", "IP_ES");
conf.set("es.port", "9200");
sparkContext = new JavaSparkContext(conf);
MyAlgoConfig config_algo = new MyAlgoConfig(Detection.byPrevisionMerge);
Machine m1 = new Machine("AL-27", "IP1", config_algo);
Machine m2 = new Machine("AL-20", "IP2", config_algo);
Machine m3 = new Machine("AL-24", "IP3", config_algo);
Machine m4 = new Machine("AL-21", "IP4", config_algo);
ArrayList<Machine> Machines = new ArrayList();
Machines.add(m1);
Machines.add(m2);
Machines.add(m3);
Machines.add(m4);
JavaRDD<Machine> machineRDD = sparkContext.parallelize(Machines);
machineRDD.foreach(machine -> machine.startDetectNow());
I try to start my algorithm in each machine which must learn from data located in Elasticsearch.
public boolean startDetectNow()
// MEGA Requete ELK
JavaRDD dataForLearn = Elastic.loadElasticsearch(
Algo.sparkContext
, "logstash-*/Collector"
, Elastic.req_AvgOfCall(
getIP()
, "hour"
, "2016-04-16T00:00:00"
, "2016-06-10T00:00:00"));
JavaRDD<Hit> RDD_hits = Elastic.mapToHit(dataForLearn);
List<Hit> hits = Elastic.RddToListHits(RDD_hits);
So I try to get all data of a query in every "Machine". My question is : is it possible to do this with Spark ? Or maybe in an other way ? When I start it in Spark; it's seams to be something like lock when the code is around the second RDD.
And the error message is :
16/08/17 00:17:13 INFO SparkContext: Starting job: collect at Elastic.java:94 16/08/17 00:17:13 INFO DAGScheduler: Got job 1 (collect at Elastic.java:94) with 1 output partitions 16/08/17 00:17:13 INFO DAGScheduler: Final stage: ResultStage 1 (collect at Elastic.java:94) 16/08/17 00:17:13 INFO DAGScheduler: Parents of final stage: List() 16/08/17 00:17:13 INFO DAGScheduler: Missing parents: List() 16/08/17 00:17:13 INFO DAGScheduler: Submitting ResultStage 1 (MapPartitionsRDD[4] at map at Elastic.java:106), which has no missing parents 16/08/17 00:17:13 INFO MemoryStore: Block broadcast_1 stored as values in memory (estimated size 4.3 KB, free 7.7 KB) 16/08/17 00:17:13 INFO MemoryStore: Block broadcast_1_piece0 stored as bytes in memory (estimated size 2.5 KB, free 10.2 KB) 16/08/17 00:17:13 INFO BlockManagerInfo: Added broadcast_1_piece0 in memory on localhost:46356 (size: 2.5 KB, free: 511.1 MB) 16/08/17 00:17:13 INFO SparkContext: Created broadcast 1 from broadcast at DAGScheduler.scala:1006 16/08/17 00:17:13 INFO DAGScheduler: Submitting 1 missing tasks from ResultStage 1 (MapPartitionsRDD[4] at map at Elastic.java:106) 16/08/17 00:17:13 INFO TaskSchedulerImpl: Adding task set 1.0 with 1 tasks ^C16/08/17 00:17:22 INFO SparkContext: Invoking stop() from shutdown hook 16/08/17 00:17:22 INFO SparkUI: Stopped Spark web UI at http://192.168.10.23:4040 16/08/17 00:17:22 INFO DAGScheduler: ResultStage 0 (foreach at GuardConnect.java:60) failed in 10,292 s 16/08/17 00:17:22 INFO DAGScheduler: Job 0 failed: foreach at GuardConnect.java:60, took 10,470974 s Exception in thread "main" org.apache.spark.SparkException: Job 0 cancelled because SparkContext was shut down at org.apache.spark.scheduler.DAGScheduler$$anonfun$cleanUpAfterSchedulerStop$1.apply(DAGScheduler.scala:806) at org.apache.spark.scheduler.DAGScheduler$$anonfun$cleanUpAfterSchedulerStop$1.apply(DAGScheduler.scala:804) at scala.collection.mutable.HashSet.foreach(HashSet.scala:79) at org.apache.spark.scheduler.DAGScheduler.cleanUpAfterSchedulerStop(DAGScheduler.scala:804) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onStop(DAGScheduler.scala:1658) at org.apache.spark.util.EventLoop.stop(EventLoop.scala:84) at org.apache.spark.scheduler.DAGScheduler.stop(DAGScheduler.scala:1581) at org.apache.spark.SparkContext$$anonfun$stop$9.apply$mcV$sp(SparkContext.scala:1740) at org.apache.spark.util.Utils$.tryLogNonFatalError(Utils.scala:1229) at org.apache.spark.SparkContext.stop(SparkContext.scala:1739) at org.apache.spark.SparkContext$$anonfun$3.apply$mcV$sp(SparkContext.scala:596) at org.apache.spark.util.SparkShutdownHook.run(ShutdownHookManager.scala:267) at org.apache.spark.util.SparkShutdownHookManager$$anonfun$runAll$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(ShutdownHookManager.scala:239) at org.apache.spark.util.SparkShutdownHookManager$$anonfun$runAll$1$$anonfun$apply$mcV$sp$1.apply(ShutdownHookManager.scala:239) at org.apache.spark.util.SparkShutdownHookManager$$anonfun$runAll$1$$anonfun$apply$mcV$sp$1.apply(ShutdownHookManager.scala:239) at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1765) at org.apache.spark.util.SparkShutdownHookManager$$anonfun$runAll$1.apply$mcV$sp(ShutdownHookManager.scala:239) at org.apache.spark.util.SparkShutdownHookManager$$anonfun$runAll$1.apply(ShutdownHookManager.scala:239) at org.apache.spark.util.SparkShutdownHookManager$$anonfun$runAll$1.apply(ShutdownHookManager.scala:239) at scala.util.Try$.apply(Try.scala:161) at org.apache.spark.util.SparkShutdownHookManager.runAll(ShutdownHookManager.scala:239) at org.apache.spark.util.SparkShutdownHookManager$$anon$2.run(ShutdownHookManager.scala:218) at org.apache.hadoop.util.ShutdownHookManager$1.run(ShutdownHookManager.java:54) at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:620) at org.apache.spark.SparkContext.runJob(SparkContext.scala:1832) at org.apache.spark.SparkContext.runJob(SparkContext.scala:1845) at org.apache.spark.SparkContext.runJob(SparkContext.scala:1858) at org.apache.spark.SparkContext.runJob(SparkContext.scala:1929) at org.apache.spark.rdd.RDD$$anonfun$foreach$1.apply(RDD.scala:912) at org.apache.spark.rdd.RDD$$anonfun$foreach$1.apply(RDD.scala:910) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:111) at org.apache.spark.rdd.RDD.withScope(RDD.scala:316) at org.apache.spark.rdd.RDD.foreach(RDD.scala:910) at org.apache.spark.api.java.JavaRDDLike$class.foreach(JavaRDDLike.scala:332) at org.apache.spark.api.java.AbstractJavaRDDLike.foreach(JavaRDDLike.scala:46) at com.seigneurin.spark.GuardConnect.main(GuardConnect.java:60) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:731) at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:181) at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:206) at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:121) at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) 16/08/17 00:17:22 ERROR LiveListenerBus: SparkListenerBus has already stopped! Dropping event SparkListenerStageCompleted(org.apache.spark.scheduler.StageInfo@4a7e0846) 16/08/17 00:17:22 INFO DAGScheduler: ResultStage 1 (collect at Elastic.java:94) failed in 9,301 s 16/08/17 00:17:22 ERROR LiveListenerBus: SparkListenerBus has already stopped! Dropping event SparkListenerStageCompleted(org.apache.spark.scheduler.StageInfo@6c6b4cb8) 16/08/17 00:17:22 ERROR LiveListenerBus: SparkListenerBus has already stopped! Dropping event SparkListenerJobEnd(0,1471385842813,JobFailed(org.apache.spark.SparkException: Job 0 cancelled because SparkContext was shut down)) 16/08/17 00:17:22 INFO DAGScheduler: Job 1 failed: collect at Elastic.java:94, took 9,317650 s 16/08/17 00:17:22 ERROR Executor: Exception in task 0.0 in stage 0.0 (TID 0) org.apache.spark.SparkException: Job 1 cancelled because SparkContext was shut down at org.apache.spark.scheduler.DAGScheduler$$anonfun$cleanUpAfterSchedulerStop$1.apply(DAGScheduler.scala:806) at org.apache.spark.scheduler.DAGScheduler$$anonfun$cleanUpAfterSchedulerStop$1.apply(DAGScheduler.scala:804) at scala.collection.mutable.HashSet.foreach(HashSet.scala:79) at org.apache.spark.scheduler.DAGScheduler.cleanUpAfterSchedulerStop(DAGScheduler.scala:804) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onStop(DAGScheduler.scala:1658) at org.apache.spark.util.EventLoop.stop(EventLoop.scala:84) at org.apache.spark.scheduler.DAGScheduler.stop(DAGScheduler.scala:1581) at org.apache.spark.SparkContext$$anonfun$stop$9.apply$mcV$sp(SparkContext.scala:1740) at org.apache.spark.util.Utils$.tryLogNonFatalError(Utils.scala:1229) at org.apache.spark.SparkContext.stop(SparkContext.scala:1739) at org.apache.spark.SparkContext$$anonfun$3.apply$mcV$sp(SparkContext.scala:596) at org.apache.spark.util.SparkShutdownHook.run(ShutdownHookManager.scala:267) at org.apache.spark.util.SparkShutdownHookManager$$anonfun$runAll$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(ShutdownHookManager.scala:239) at org.apache.spark.util.SparkShutdownHookManager$$anonfun$runAll$1$$anonfun$apply$mcV$sp$1.apply(ShutdownHookManager.scala:239) at org.apache.spark.util.SparkShutdownHookManager$$anonfun$runAll$1$$anonfun$apply$mcV$sp$1.apply(ShutdownHookManager.scala:239) at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1765) at org.apache.spark.util.SparkShutdownHookManager$$anonfun$runAll$1.apply$mcV$sp(ShutdownHookManager.scala:239) at org.apache.spark.util.SparkShutdownHookManager$$anonfun$runAll$1.apply(ShutdownHookManager.scala:239) at org.apache.spark.util.SparkShutdownHookManager$$anonfun$runAll$1.apply(ShutdownHookManager.scala:239) at scala.util.Try$.apply(Try.scala:161) at org.apache.spark.util.SparkShutdownHookManager.runAll(ShutdownHookManager.scala:239) at org.apache.spark.util.SparkShutdownHookManager$$anon$2.run(ShutdownHookManager.scala:218) at org.apache.hadoop.util.ShutdownHookManager$1.run(ShutdownHookManager.java:54) at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:620) at org.apache.spark.SparkContext.runJob(SparkContext.scala:1832) at org.apache.spark.SparkContext.runJob(SparkContext.scala:1845) at org.apache.spark.SparkContext.runJob(SparkContext.scala:1858) at org.apache.spark.SparkContext.runJob(SparkContext.scala:1929) at org.apache.spark.rdd.RDD$$anonfun$collect$1.apply(RDD.scala:927) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:111) at org.apache.spark.rdd.RDD.withScope(RDD.scala:316) at org.apache.spark.rdd.RDD.collect(RDD.scala:926) at org.apache.spark.api.java.JavaRDDLike$class.collect(JavaRDDLike.scala:339) at org.apache.spark.api.java.AbstractJavaRDDLike.collect(JavaRDDLike.scala:46) at com.seigneurin.spark.Elastic.RddToListHits(Elastic.java:94) at com.seigneurin.spark.OXO.prepareDataAndLearn(OXO.java:126) at com.seigneurin.spark.OXO.startDetectNow(OXO.java:148) at com.seigneurin.spark.GuardConnect.lambda$main$1282d8df$1(GuardConnect.java:60) at org.apache.spark.api.java.JavaRDDLike$$anonfun$foreach$1.apply(JavaRDDLike.scala:332) at org.apache.spark.api.java.JavaRDDLike$$anonfun$foreach$1.apply(JavaRDDLike.scala:332) at scala.collection.Iterator$class.foreach(Iterator.scala:727) at org.apache.spark.InterruptibleIterator.foreach(InterruptibleIterator.scala:28) at org.apache.spark.rdd.RDD$$anonfun$foreach$1$$anonfun$apply$32.apply(RDD.scala:912) at org.apache.spark.rdd.RDD$$anonfun$foreach$1$$anonfun$apply$32.apply(RDD.scala:912) at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1858) at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1858) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66) at org.apache.spark.scheduler.Task.run(Task.scala:89) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) 16/08/17 00:17:22 ERROR LiveListenerBus: SparkListenerBus has already stopped! Dropping event SparkListenerJobEnd(1,1471385842814,JobFailed(org.apache.spark.SparkException: Job 1 cancelled because SparkContext was shut down)) 16/08/17 00:17:22 INFO MapOutputTrackerMasterEndpoint: MapOutputTrackerMasterEndpoint stopped! 16/08/17 00:17:22 INFO MemoryStore: MemoryStore cleared 16/08/17 00:17:22 INFO BlockManager: BlockManager stopped 16/08/17 00:17:22 INFO BlockManagerMaster: BlockManagerMaster stopped 16/08/17 00:17:22 INFO OutputCommitCoordinator$OutputCommitCoordinatorEndpoint: OutputCommitCoordinator stopped! 16/08/17 00:17:22 INFO RemoteActorRefProvider$RemotingTerminator: Shutting down remote daemon. 16/08/17 00:17:22 INFO RemoteActorRefProvider$RemotingTerminator: Remote daemon shut down; proceeding with flushing remote transports. 16/08/17 00:17:22 INFO TaskSetManager: Starting task 0.0 in stage 1.0 (TID 1, localhost, partition 0,ANY, 6751 bytes) 16/08/17 00:17:22 ERROR Inbox: Ignoring error java.util.concurrent.RejectedExecutionException: Task org.apache.spark.executor.Executor$TaskRunner@65fd4104 rejected from java.util.concurrent.ThreadPoolExecutor@4387a1bf[Terminated, pool size = 0, active threads = 0, queued tasks = 0, completed tasks = 1] at java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2047) at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:823) at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1369) at org.apache.spark.executor.Executor.launchTask(Executor.scala:128) at org.apache.spark.scheduler.local.LocalEndpoint$$anonfun$reviveOffers$1.apply(LocalBackend.scala:86) at org.apache.spark.scheduler.local.LocalEndpoint$$anonfun$reviveOffers$1.apply(LocalBackend.scala:84) at scala.collection.immutable.List.foreach(List.scala:318) at org.apache.spark.scheduler.local.LocalEndpoint.reviveOffers(LocalBackend.scala:84) at org.apache.spark.scheduler.local.LocalEndpoint$$anonfun$receive$1.applyOrElse(LocalBackend.scala:69) at org.apache.spark.rpc.netty.Inbox$$anonfun$process$1.apply$mcV$sp(Inbox.scala:116) at org.apache.spark.rpc.netty.Inbox.safelyCall(Inbox.scala:204) at org.apache.spark.rpc.netty.Inbox.process(Inbox.scala:100) at org.apache.spark.rpc.netty.Dispatcher$MessageLoop.run(Dispatcher.scala:215) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) 16/08/17 00:17:22 INFO SparkContext: Successfully stopped SparkContext 16/08/17 00:17:22 INFO ShutdownHookManager: Shutdown hook called 16/08/17 00:17:22 INFO ShutdownHookManager: Deleting directory /tmp/spark-8bf65e78-a916-4cc0-b4d1-1b0ec9a07157 16/08/17 00:17:22 INFO RemoteActorRefProvider$RemotingTerminator: Remoting shut down. 16/08/17 00:17:22 INFO ShutdownHookManager: Deleting directory /tmp/spark-8bf65e78-a916-4cc0-b4d1-1b0ec9a07157/httpd-6d3aeb80-808c-4749-8f8b-ac9341f990a7
Thank if you can give me some advice.
foreach
. – Yuval Itzchakov