3
votes

I cannot resolve the following serialization issue that is triggered by filtered.foreachPartition(iter => {. I though that foreachPartition can resolve serialization problem, but it's not the case. So, how to use redisPool?

EDIT (I updated the code to make it more clear):

val redis_host = "localhost"
val redist_port = 6379
messages.foreachRDD(rdd => {
  rdd.foreachPartition(iter => {
    val redisPool = new Pool(new JedisPool(new JedisPoolConfig(), redis_host, redis_port, 2000))
    iter.foreach({ msg =>
      println(msg.mkString(","))
    })
  })
})

I assume that the variables redis_host and redis_port are not serializable, but how do I serialize them correctly so that the code can work on the cluster, not only locally?

The above-shown code throws the error:

org.apache.spark.SparkException: Task not serializable at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:304) at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:294) at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:122) at org.apache.spark.SparkContext.clean(SparkContext.scala:2055) at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1.apply(RDD.scala:919) at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1.apply(RDD.scala:918) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:111) at org.apache.spark.rdd.RDD.withScope(RDD.scala:316) at org.apache.spark.rdd.RDD.foreachPartition(RDD.scala:918) at org.test.manager.service.consumer.kafka.KafkaDecisionsConsumer$$anonfun$run$1.apply(KafkaDecisionsConsumer.scala:135) at org.test.manager.service.consumer.kafka.KafkaDecisionsConsumer$$anonfun$run$1.apply(KafkaDecisionsConsumer.scala:134) at org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1$$anonfun$apply$mcV$sp$3.apply(DStream.scala:661) at org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1$$anonfun$apply$mcV$sp$3.apply(DStream.scala:661) at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(ForEachDStream.scala:50) at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:50) at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:50) at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:426) at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply$mcV$sp(ForEachDStream.scala:49) at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:49) at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:49) at scala.util.Try$.apply(Try.scala:161) at org.apache.spark.streaming.scheduler.Job.run(Job.scala:39) at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply$mcV$sp(JobScheduler.scala:224) at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:224) at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:224) at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57) at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.run(JobScheduler.scala:223) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) Caused by: java.io.NotSerializableException: org.test.manager.service.consumer.kafka.KafkaDecisionsConsumer Serialization stack: - object not serializable (class: org.test.manager.service.consumer.kafka.KafkaDecisionsConsumer, value: org.test.manager.service.consumer.kafka.KafkaDecisionsConsumer@3fba5c74) - field (class: org.test.manager.service.consumer.kafka.KafkaDecisionsConsumer$$anonfun$run$1, name: $outer, type: class org.test.manager.service.consumer.kafka.KafkaDecisionsConsumer) - object (class org.test.manager.service.consumer.kafka.KafkaDecisionsConsumer$$anonfun$run$1, ) - field (class: org.test.manager.service.consumer.kafka.KafkaDecisionsConsumer$$anonfun$run$1$$anonfun$apply$1, name: $outer, type: class org.test.manager.service.consumer.kafka.KafkaDecisionsConsumer$$anonfun$run$1) - object (class org.test.manager.service.consumer.kafka.KafkaDecisionsConsumer$$anonfun$run$1$$anonfun$apply$1, ) at org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:40) at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:47) at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:101) at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:301) ... 30 more Exception in thread "main" org.apache.spark.SparkException: Task not serializable at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:304) at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:294) at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:122) at org.apache.spark.SparkContext.clean(SparkContext.scala:2055) at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1.apply(RDD.scala:919) at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1.apply(RDD.scala:918) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:111) at org.apache.spark.rdd.RDD.withScope(RDD.scala:316) at org.apache.spark.rdd.RDD.foreachPartition(RDD.scala:918) at org.test.manager.service.consumer.kafka.KafkaDecisionsConsumer$$anonfun$run$1.apply(KafkaDecisionsConsumer.scala:135) at org.test.manager.service.consumer.kafka.KafkaDecisionsConsumer$$anonfun$run$1.apply(KafkaDecisionsConsumer.scala:134) at org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1$$anonfun$apply$mcV$sp$3.apply(DStream.scala:661) at org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1$$anonfun$apply$mcV$sp$3.apply(DStream.scala:661) at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(ForEachDStream.scala:50) at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:50) at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:50) at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:426) at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply$mcV$sp(ForEachDStream.scala:49) at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:49) at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:49) at scala.util.Try$.apply(Try.scala:161) at org.apache.spark.streaming.scheduler.Job.run(Job.scala:39) at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply$mcV$sp(JobScheduler.scala:224) at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:224) at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:224) at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57) at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.run(JobScheduler.scala:223) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) Caused by: java.io.NotSerializableException: org.test.manager.service.consumer.kafka.KafkaDecisionsConsumer Serialization stack: - object not serializable (class: org.test.manager.service.consumer.kafka.KafkaDecisionsConsumer, value: org.test.manager.service.consumer.kafka.KafkaDecisionsConsumer@3fba5c74) - field (class: org.test.manager.service.consumer.kafka.KafkaDecisionsConsumer$$anonfun$run$1, name: $outer, type: class org.test.manager.service.consumer.kafka.KafkaDecisionsConsumer) - object (class org.test.manager.service.consumer.kafka.KafkaDecisionsConsumer$$anonfun$run$1, ) - field (class: org.test.manager.service.consumer.kafka.KafkaDecisionsConsumer$$anonfun$run$1$$anonfun$apply$1, name: $outer, type: class org.test.manager.service.consumer.kafka.KafkaDecisionsConsumer$$anonfun$run$1) - object (class org.test.manager.service.consumer.kafka.KafkaDecisionsConsumer$$anonfun$run$1$$anonfun$apply$1, ) at org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:40) at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:47) at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:101) at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:301)

1
Could you share the serialization exception?maasg
using mapPartitions or foreachPartition will help for your redisPool (which is created locally, on the worker and not serialized if you had created it on the master), but are the objects inside your RDDs serializable? it would help if you provided a complete example, including the content of myDstreamPascal Soucy
What is filtered? Why aren't you using rdd?Yuval Itzchakov
@YuvalItzchakov: Indeed it was not very clear. Sorry. I updated the code snippet. It's now an exact copy-paste of what I have. Also I putted the complete error stack. Really want to know what's happening with this code.duckertito
@maasg: Ok. Please take a look at the update.duckertito

1 Answers

0
votes

The solution is to initialise the pool lazily inside your anonymous function. In java you could do something like this:

messages.foreachRDD(new RedisFunction(redis_host, redis_port))
messages.count()

class RedisFunction implements F {
    private Pool pool = null;
    private final String redis_host;
    private final String redis_port;

    RedisFunction(String redis_host, String redis_port) {
        this.redis_host = redis_host;
        this.redis_port = redis_port;
        initPool();
    }
    private void initPool() {
        this.pool = new Pool(new JedisPool(new JedisPoolConfig(), redis_host, redis_port, 2000))
    }
    public Void call(JavaRDD<> rdd) {
        if(this.pool == null) {
            initPool();
        }
        rdd = rdd.map(....);  /*your rdd transformations go here*/
        rdd.count();   //spark action
    }
}

Above java example should help you to fix the serialisation issue.