2
votes

I'm running a pyspark script and encountered an error below. It seems saying "RuntimeError: Set changed size during iteration" due to my code "if len(rdd.take(1)) > 0:". I'm not sure if that's the real reason and wonder what exactly went wrong. Any help will be greatly appreciated.

thanks!

17/03/23 21:54:17 INFO DStreamGraph: Updated checkpoint data for time 1490320070000 ms
17/03/23 21:54:17 INFO JobScheduler: Finished job streaming job 1490320072000 ms.0 from job set of time 1490320072000 ms
17/03/23 21:54:17 INFO JobScheduler: Starting job streaming job 1490320072000 ms.1 from job set of time 1490320072000 ms
17/03/23 21:54:17 ERROR JobScheduler: Error running job streaming job 1490320072000 ms.0
org.apache.spark.SparkException: An exception was raised by Python:
Traceback (most recent call last):
  File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/streaming/util.py",

line 65, in call r = self.func(t, *rdds) File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/streaming/dstream.py", line 159, in func = lambda t, rdd: old_func(rdd) File "/home/richard/Documents/spark_code/with_kafka/./mongo_kafka_spark_script.py", line 96, in _compute_glb_max if len(rdd.take(1)) > 0: File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/rdd.py", line 1343, in take res = self.context.runJob(self, takeUpToNumLeft, p) File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/context.py", line 965, in runJob port = self._jvm.PythonRDD.runJob(self._jsc.sc(), mappedRDD._jrdd, partitions) File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/rdd.py", line 2439, in _jrdd self._jrdd_deserializer, profiler) File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/rdd.py", line 2372, in _wrap_function pickled_command, broadcast_vars, env, includes = _prepare_for_python_RDD(sc, command) File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/rdd.py", line 2363, in _prepare_for_python_RDD broadcast_vars = [x._jbroadcast for x in sc._pickled_broadcast_vars] RuntimeError: Set changed size during iteration

  at org.apache.spark.streaming.api.python.TransformFunction.callPythonTransformFunction(PythonDStream.scala:95)
  at org.apache.spark.streaming.api.python.TransformFunction.apply(PythonDStream.scala:78)
  at org.apache.spark.streaming.api.python.PythonDStream$$anonfun$callForeachRDD$1.apply(PythonDStream.scala:179)
  at org.apache.spark.streaming.api.python.PythonDStream$$anonfun$callForeachRDD$1.apply(PythonDStream.scala:179)
  at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(ForEachDStream.scala:51)
  at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:51)
  at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:51)
  at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:415)
  at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply$mcV$sp(ForEachDStream.scala:50)
  at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:50)
  at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:50)
  at scala.util.Try$.apply(Try.scala:192)
  at org.apache.spark.streaming.scheduler.Job.run(Job.scala:39)
  at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply$mcV$sp(JobScheduler.scala:254)
  at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:254)
  at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:254)
  at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)
  at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.run(JobScheduler.scala:253)
  at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
  at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
  at java.lang.Thread.run(Thread.java:745)
Traceback (most recent call last):
  File "/home/richard/Documents/spark_code/with_kafka/./mongo_kafka_spark_script.py",

line 224, in ssc.awaitTermination(); File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/streaming/context.py", line 206, in awaitTermination File "/usr/lib/spark/python/lib/py4j-0.10.4-src.zip/py4j/java_gateway.py", line 1133, in call File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/sql/utils.py", line 63, in deco File "/usr/lib/spark/python/lib/py4j-0.10.4-src.zip/py4j/protocol.py", line 319, in get_return_value py4j.protocol.Py4JJavaError: An error occurred while calling o38.awaitTermination. : org.apache.spark.SparkException: An exception was raised by Python: Traceback (most recent call last): File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/streaming/util.py", line 65, in call r = self.func(t, *rdds) File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/streaming/dstream.py", line 159, in func = lambda t, rdd: old_func(rdd) File "/home/richard/Documents/spark_code/with_kafka/./mongo_kafka_spark_script.py", line 96, in _compute_glb_max if len(rdd.take(1)) > 0: File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/rdd.py", line 1343, in take res = self.context.runJob(self, takeUpToNumLeft, p) File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/context.py", line 965, in runJob port = self._jvm.PythonRDD.runJob(self._jsc.sc(), mappedRDD._jrdd, partitions) File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/rdd.py", line 2439, in _jrdd self._jrdd_deserializer, profiler) File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/rdd.py", line 2372, in _wrap_function pickled_command, broadcast_vars, env, includes = _prepare_for_python_RDD(sc, command) File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/rdd.py", line 2363, in _prepare_for_python_RDD broadcast_vars = [x._jbroadcast for x in sc._pickled_broadcast_vars] RuntimeError: Set changed size during iteration

  at org.apache.spark.streaming.api.python.TransformFunction.callPythonTransformFunction(PythonDStream.scala:95)
  at org.apache.spark.streaming.api.python.TransformFunction.apply(PythonDStream.scala:78)
  at org.apache.spark.streaming.api.python.PythonDStream$$anonfun$callForeachRDD$1.apply(PythonDStream.scala:179)
  at org.apache.spark.streaming.api.python.PythonDStream$$anonfun$callForeachRDD$1.apply(PythonDStream.scala:179)
  at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(ForEachDStream.scala:51)
  at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:51)
  at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:51)
  at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:415)
  at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply$mcV$sp(ForEachDStream.scala:50)
  at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:50)
  at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:50)
  at scala.util.Try$.apply(Try.scala:192)
  at org.apache.spark.streaming.scheduler.Job.run(Job.scala:39)
  at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply$mcV$sp(JobScheduler.scala:254)
  at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:254)
  at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:254)
  at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)
  at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.run(JobScheduler.scala:253)
  at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
  at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
  at java.lang.Thread.run(Thread.java:745)
2
I changed that part of code to "if not rdd.isEmpty():" and got the same error.rxie
RuntimeError: Set changed size during iteration means that you are working on a object of type set and it has changed size whilst you're manipulating it (or working with it however you may be). This throws an error because sets are immutable. Hopefully you can generalize off of this similar question stackoverflow.com/questions/22846719/…semore_1267
Thanks for your suggestion!rxie

2 Answers

2
votes

It seems not to be the best practice creating broadcast variables among iterations. Always use updateStateByKey if possible when stateful data is required.

1
votes

try

if rdd.count() <1 :

take() can give exceptions, but, if more details were available we could have pinpointed the error.