I'm running a pyspark script and encountered an error below. It seems saying "RuntimeError: Set changed size during iteration" due to my code "if len(rdd.take(1)) > 0:". I'm not sure if that's the real reason and wonder what exactly went wrong. Any help will be greatly appreciated.
thanks!
17/03/23 21:54:17 INFO DStreamGraph: Updated checkpoint data for time 1490320070000 ms 17/03/23 21:54:17 INFO JobScheduler: Finished job streaming job 1490320072000 ms.0 from job set of time 1490320072000 ms 17/03/23 21:54:17 INFO JobScheduler: Starting job streaming job 1490320072000 ms.1 from job set of time 1490320072000 ms 17/03/23 21:54:17 ERROR JobScheduler: Error running job streaming job 1490320072000 ms.0 org.apache.spark.SparkException: An exception was raised by Python: Traceback (most recent call last): File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/streaming/util.py",
line 65, in call r = self.func(t, *rdds) File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/streaming/dstream.py", line 159, in func = lambda t, rdd: old_func(rdd) File "/home/richard/Documents/spark_code/with_kafka/./mongo_kafka_spark_script.py", line 96, in _compute_glb_max if len(rdd.take(1)) > 0: File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/rdd.py", line 1343, in take res = self.context.runJob(self, takeUpToNumLeft, p) File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/context.py", line 965, in runJob port = self._jvm.PythonRDD.runJob(self._jsc.sc(), mappedRDD._jrdd, partitions) File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/rdd.py", line 2439, in _jrdd self._jrdd_deserializer, profiler) File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/rdd.py", line 2372, in _wrap_function pickled_command, broadcast_vars, env, includes = _prepare_for_python_RDD(sc, command) File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/rdd.py", line 2363, in _prepare_for_python_RDD broadcast_vars = [x._jbroadcast for x in sc._pickled_broadcast_vars] RuntimeError: Set changed size during iteration
at org.apache.spark.streaming.api.python.TransformFunction.callPythonTransformFunction(PythonDStream.scala:95) at org.apache.spark.streaming.api.python.TransformFunction.apply(PythonDStream.scala:78) at org.apache.spark.streaming.api.python.PythonDStream$$anonfun$callForeachRDD$1.apply(PythonDStream.scala:179) at org.apache.spark.streaming.api.python.PythonDStream$$anonfun$callForeachRDD$1.apply(PythonDStream.scala:179) at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(ForEachDStream.scala:51) at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:51) at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:51) at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:415) at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply$mcV$sp(ForEachDStream.scala:50) at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:50) at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:50) at scala.util.Try$.apply(Try.scala:192) at org.apache.spark.streaming.scheduler.Job.run(Job.scala:39) at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply$mcV$sp(JobScheduler.scala:254) at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:254) at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:254) at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58) at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.run(JobScheduler.scala:253) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) Traceback (most recent call last): File "/home/richard/Documents/spark_code/with_kafka/./mongo_kafka_spark_script.py",
line 224, in ssc.awaitTermination(); File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/streaming/context.py", line 206, in awaitTermination File "/usr/lib/spark/python/lib/py4j-0.10.4-src.zip/py4j/java_gateway.py", line 1133, in call File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/sql/utils.py", line 63, in deco File "/usr/lib/spark/python/lib/py4j-0.10.4-src.zip/py4j/protocol.py", line 319, in get_return_value py4j.protocol.Py4JJavaError: An error occurred while calling o38.awaitTermination. : org.apache.spark.SparkException: An exception was raised by Python: Traceback (most recent call last): File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/streaming/util.py", line 65, in call r = self.func(t, *rdds) File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/streaming/dstream.py", line 159, in func = lambda t, rdd: old_func(rdd) File "/home/richard/Documents/spark_code/with_kafka/./mongo_kafka_spark_script.py", line 96, in _compute_glb_max if len(rdd.take(1)) > 0: File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/rdd.py", line 1343, in take res = self.context.runJob(self, takeUpToNumLeft, p) File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/context.py", line 965, in runJob port = self._jvm.PythonRDD.runJob(self._jsc.sc(), mappedRDD._jrdd, partitions) File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/rdd.py", line 2439, in _jrdd self._jrdd_deserializer, profiler) File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/rdd.py", line 2372, in _wrap_function pickled_command, broadcast_vars, env, includes = _prepare_for_python_RDD(sc, command) File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/rdd.py", line 2363, in _prepare_for_python_RDD broadcast_vars = [x._jbroadcast for x in sc._pickled_broadcast_vars] RuntimeError: Set changed size during iteration
at org.apache.spark.streaming.api.python.TransformFunction.callPythonTransformFunction(PythonDStream.scala:95) at org.apache.spark.streaming.api.python.TransformFunction.apply(PythonDStream.scala:78) at org.apache.spark.streaming.api.python.PythonDStream$$anonfun$callForeachRDD$1.apply(PythonDStream.scala:179) at org.apache.spark.streaming.api.python.PythonDStream$$anonfun$callForeachRDD$1.apply(PythonDStream.scala:179) at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(ForEachDStream.scala:51) at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:51) at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:51) at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:415) at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply$mcV$sp(ForEachDStream.scala:50) at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:50) at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:50) at scala.util.Try$.apply(Try.scala:192) at org.apache.spark.streaming.scheduler.Job.run(Job.scala:39) at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply$mcV$sp(JobScheduler.scala:254) at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:254) at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:254) at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58) at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.run(JobScheduler.scala:253) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745)
RuntimeError: Set changed size during iteration
means that you are working on a object of typeset
and it has changed size whilst you're manipulating it (or working with it however you may be). This throws an error because sets are immutable. Hopefully you can generalize off of this similar question stackoverflow.com/questions/22846719/… – semore_1267