1
votes

all

I was reading the section about accumulator in Spark documentation. http://spark.apache.org/docs/latest/rdd-programming-guide.html#accumulators

I was trying to run the sample code in spark-shell:

  1. download the spark zip file
  2. unzip file and cd to the directory
  3. execute ./bin/spark-shell
scala> val accum = sc.longAccumulator("My Accumulator")
accum: org.apache.spark.util.LongAccumulator = LongAccumulator(id: 0, name: Some(My Accumulator), value: 0)

scala> sc.parallelize(Array(1, 2, 3, 4)).foreach(x => accum.add(x))
...
10/09/29 18:41:08 INFO SparkContext: Tasks finished in 0.317106 s

scala> accum.value
res2: Long = 10

But no luck, I tried spark 2 and spark 3, both throw an exception. Could you tell me why?

This is spark 2

Spark context available as 'sc' (master = local[*], app id = local-1609254493356).
Spark session available as 'spark'.
Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /___/ .__/\_,_/_/ /_/\_\   version 2.4.7
      /_/
         
Using Scala version 2.11.12 (Java HotSpot(TM) 64-Bit Server VM, Java 15.0.1)
Type in expressions to have them evaluated.
Type :help for more information.

scala> val accum = sc.longAccumulator("My Accumulator")
accum: org.apache.spark.util.LongAccumulator = LongAccumulator(id: 0, name: Some(My Accumulator), value: 0)

scala> sc.parallelize(Array(1, 2, 3, 4)).foreach(x => accum.add(x))

  at org.apache.spark.util.FieldAccessFinder$$anon$4$$anonfun$visitMethodInsn$7.apply(ClosureCleaner.scala:845)
  at org.apache.spark.util.FieldAccessFinder$$anon$4$$anonfun$visitMethodInsn$7.apply(ClosureCleaner.scala:828)
  at scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:733)
  at scala.collection.mutable.HashMap$$anon$1$$anonfun$foreach$2.apply(HashMap.scala:134)
  at scala.collection.mutable.HashMap$$anon$1$$anonfun$foreach$2.apply(HashMap.scala:134)
  at scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:236)
  at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:40)
  at scala.collection.mutable.HashMap$$anon$1.foreach(HashMap.scala:134)

This is spark 3

Spark context Web UI available at http://192.168.57.243:4040
Spark context available as 'sc' (master = local[*], app id = local-1609254662877).
Spark session available as 'spark'.
Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /___/ .__/\_,_/_/ /_/\_\   version 3.0.1
      /_/
         
Using Scala version 2.12.10 (Java HotSpot(TM) 64-Bit Server VM, Java 15.0.1)
Type in expressions to have them evaluated.
Type :help for more information.

scala> val accum = sc.longAccumulator("My Accumulator")
accum: org.apache.spark.util.LongAccumulator = LongAccumulator(id: 0, name: Some(My Accumulator), value: 0)

scala> sc.parallelize(Array(1, 2, 3, 4)).foreach(x => accum.add(x))
java.lang.IllegalAccessException: Can not set final $iw field $Lambda$2159/0x0000000801470488.arg$1 to $iw
  at java.base/jdk.internal.reflect.UnsafeFieldAccessorImpl.throwFinalFieldIllegalAccessException(UnsafeFieldAccessorImpl.java:76)
  at java.base/jdk.internal.reflect.UnsafeFieldAccessorImpl.throwFinalFieldIllegalAccessException(UnsafeFieldAccessorImpl.java:80)
  at java.base/jdk.internal.reflect.UnsafeQualifiedObjectFieldAccessorImpl.set(UnsafeQualifiedObjectFieldAccessorImpl.java:79)
  at java.base/java.lang.reflect.Field.set(Field.java:793)
  at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:398)
  at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:162)
  at org.apache.spark.SparkContext.clean(SparkContext.scala:2362)
  at org.apache.spark.rdd.RDD.$anonfun$foreach$1(RDD.scala:985)
  at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
  at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
  at org.apache.spark.rdd.RDD.withScope(RDD.scala:388)
  at org.apache.spark.rdd.RDD.foreach(RDD.scala:984)
  ... 47 elided
1

1 Answers

0
votes

Java 15.0.1 is the problem. Java 11 is the highest version supported:

Spark runs on Java 8/11, Scala 2.12, Python 2.7+/3.4+ and R 3.5+. Java 8 prior to version 8u92 support is deprecated as of Spark 3.0.0.