1
votes

I'm using Google cloud platform to make the image proccessing with Spark (2.0.2). When I execute my code (Java), I get this Error :

[Stage 1:> (0 + 0) / 2]17/10/15 13:39:44 WARN org.apache.spark.scheduler.TaskSetManager: Stage 1 contains a task of very large size (165836 KB). The maximum recommended task size is 100 KB.

[Stage 1:> (0 + 1) / 2]Exception in thread "dispatcher-event-loop-1" java.lang.OutOfMemoryError: Java heap space

at java.util.Arrays.copyOf(Arrays.java:3236)
at java.io.ByteArrayOutputStream.grow(ByteArrayOutputStream.java:118)
at java.io.ByteArrayOutputStream.ensureCapacity(ByteArrayOutputStream.java:93) at java.io.ByteArrayOutputStream.write(ByteArrayOutputStream.java:153) at org.apache.spark.util.ByteBufferOutputStream.write(ByteBufferOutputStream.scala:41) at java.io.ObjectOutputStream$BlockDataOutputStream.write(ObjectOutputStream.java:1853) at java.io.ObjectOutputStream.write(ObjectOutputStream.java:709) at java.nio.channels.Channels$WritableByteChannelImpl.write(Channels.java:458) at org.apache.spark.util.SerializableBuffer$$anonfun$writeObject$1.apply(SerializableBuffer.scala:49) at org.apache.spark.util.SerializableBuffer$$anonfun$writeObject$1.apply(SerializableBuffer.scala:47) at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1276) at org.apache.spark.util.SerializableBuffer.writeObject(SerializableBuffer.scala:47) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498)
at java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:1028) at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1496) at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548) at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509) at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348) at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:43) at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:100) at org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend$DriverEndpoint$$anonfun$launchTasks$1.apply(CoarseGrainedSchedulerBackend.scala:250) at org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend$DriverEndpoint$$anonfun$launchTasks$1.apply(CoarseGrainedSchedulerBackend.scala:249) at scala.collection.immutable.List.foreach(List.scala:381) at org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend$DriverEndpoint.launchTasks(CoarseGrainedSchedulerBackend.scala:249) at org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend$DriverEndpoint.org$apache$spark$scheduler$cluster$CoarseGrainedSchedulerBackend$DriverEndpoint$$makeOffers(CoarseGrainedSchedulerBackend.scala:220)

Where and how do I increase Java heap space?

My program :

public static void main(String[] args) {
  try{

          //Configuration de Spark .... 
          SparkSession spark = SparkSession
                .builder()
                .appName("Features")
                .getOrCreate();

          JavaSparkContext jsc = new JavaSparkContext(spark.sparkContext());

          //Configuration HBase  .... 
          String tableName = "Descripteurs";
          Configuration conf = HBaseConfiguration.create();
          conf.addResource(new Path("/home/ibtissam/hbase-1.2.5/conf/hbase-site.xml"));
          conf.addResource(new Path("/home/ibtissam/hbase-1.2.5/conf/core-site.xml"));
          conf.set(TableInputFormat.INPUT_TABLE, tableName);

          Connection connection = ConnectionFactory.createConnection(conf);
          Admin admin = connection.getAdmin(); 
          Table tab = connection.getTable(TableName.valueOf(tableName));

          for (int n=0; n<10; n++) {
              List<String> images =new ArrayList<>();
              String repertory_ = "/home/ibtissam/images-test-10000/images-"+n+"/"; 
              File repertory = new File(repertory_);
              String files[] = repertory.list(); 

              for(int k=0; k<10;k++){
                  ExecutorService executorService = Executors.newCachedThreadPool();
                  List<MyRunnable> runnableList = new ArrayList<>();

                  for(int i=k*100; i<(k+1)*100 ; i++){
                        MyRunnable runnable = new MyRunnable(repertory_+files[i]); 
                        runnableList.add(runnable);
                        executorService.execute(runnable);
                  }
                  executorService.shutdown();

                  while(!executorService.isTerminated()){}

                  for (int i=0; i<runnableList.size(); i++) {
                      images.add(runnableList.get(i).descripteurs_);
                  }
              }

          JavaRDD<String> rdd = jsc.parallelize(images, 2000);

          //Calcul des descripteurs
          JavaPairRDD<String,String> rdd_final = rdd.mapToPair(new PairFunction<String,String,String>() {
                @Override
                public Tuple2<String,String> call(String value) {

                  String strTab[] = value.split(","); 
                  int h = Integer.parseInt(strTab[1]);
                  int w = Integer.parseInt(strTab[2]);
                  String type = strTab[3]; 
                  String nom = strTab[0];
                  String key = nom+"-"+h+"-"+w+"-"+type;

                  // Conversion de String >> Mat
                  Mat image = new Mat(h, w, 16);
                  UByteRawIndexer idx = image.createIndexer();
                  int indice = 4;
                  for (int i =0;i<h;i++) {
                    for (int j=0;j<w;j++) {
                      idx.put(i, j, Integer.parseInt(strTab[indice]));
                      indice = indice++;
                    }
                  }

                  // Calcul des features 
                  SIFT sift = new SIFT().create(); 
                  KeyPointVector keypoints = new KeyPointVector();
                  Mat descriptors = new Mat();

                  image.convertTo(image, CV_8UC3);

                  sift.detect(image, keypoints);

                  KeyPointVector keypoints_sorted = new KeyPointVector(); 
                  keypoints_sorted = sort(keypoints);
                  KeyPointVector  keypoints_2 = new KeyPointVector((keypoints_sorted.size())/4); 
                  for (int k = 0; k < (keypoints_sorted.size())/4; k++){
                      keypoints_2.put(k, keypoints_sorted.get(k));  
                  }

                  sift.compute(image,keypoints_2,descriptors);
                  image.release(); 

                  int hDes = descriptors.size().height();
                  int wDes = descriptors.size().width();
                  key = key +"-"+hDes+"-"+wDes+"-"+descriptors.type();

                  while(hDes ==0 | wDes==0){
                      SIFT sift_ = new SIFT().create(); 
                      KeyPointVector keypoints_ = new KeyPointVector();

                      sift.detect(image, keypoints_);

                      KeyPointVector keypoints_sorted_ = new KeyPointVector(); 
                      keypoints_sorted_ = sort(keypoints_);
                      KeyPointVector  keypoints_2_ = new KeyPointVector((keypoints_sorted_.size())/4); 
                      for (int k = 0; k < (keypoints_sorted_.size())/4; k++){
                          keypoints_2_.put(k, keypoints_sorted_.get(k));  
                      }

                      sift_.compute(image,keypoints_2_,descriptors);
                  }

                  // Converion des features => String 
                  String featuresStr = new String("");
                  FloatRawIndexer idx_ = descriptors.createIndexer(); 
                  int position =0;

                  for (int i =0;i < descriptors.size().height();i++) {
                    for (int j =0;j < descriptors.size().width();j++) {

                      if (position == 0) {
                          featuresStr = String.valueOf(idx_.get(position))+",";
                      }
                      if (position == ((descriptors.size().height()*descriptors.size().width())-1) ){
                          featuresStr = featuresStr + String.valueOf(idx_.get(position));                  
                      }else{
                          featuresStr = featuresStr + String.valueOf(idx_.get(position))+","; 
                      }
                      position++;
                    }
                  }
                  descriptors.release(); 
                  Tuple2<String, String> tuple = new Tuple2<>(key, featuresStr);
                  return tuple;
                } 
              });

              System.out.println("Fin de calcul des descripteurs  .... ");

              List<Tuple2<String,String>> liste = rdd_final.collect();

              System.out.println("Insertion dans hbase .... \n");
              for (int b=0; b<liste.size(); b++) {

                    String metadata[] = liste.get(b)._1().split("-"); 
                    String data = liste.get(b)._2();
                    // Row 
                    byte [] row = Bytes.toBytes(liste.get(b)._1());

                    // Family
                    byte [] family1 = Bytes.toBytes("Metadata");
                    byte [] family2 = Bytes.toBytes("Data");

                    // Qualifiers
                    byte [] height = Bytes.toBytes("height");
                    byte [] width = Bytes.toBytes("width");
                    byte [] colorSpace = Bytes.toBytes("colorSpace");
                    byte [] name = Bytes.toBytes("name");

                    byte [] features = Bytes.toBytes("features");

                    // Create Put
                    Put put = new Put(row);
                    put.addColumn(family1, height, Bytes.toBytes(metadata[5]));
                    put.addColumn(family1, width, Bytes.toBytes(metadata[6]));
                    put.addColumn(family1, name, Bytes.toBytes(metadata[0]+"-"+metadata[1]+"-"+metadata[2]+"-"+metadata[3]));
                    put.addColumn(family1, colorSpace, Bytes.toBytes(metadata[4]));
                    put.addColumn(family2, features, Bytes.toBytes(liste.get(b)._2()));
                    tab.put(put);
              }
            }
            jsc.close();

      }catch(Exception e){

        System.out.println(e);
      }
    }
1
You really need to focus on reducing your task size, not increasing your JVM size.Joe C
Why do your task need ~160MB heap space? Please add your program to your question.Progman
Looks like broadcast could help here as there's a lookup table used inside (or something very similar).Jacek Laskowski
@Progman I've added my program to the question.Ibi_Bo
@JoeC I have reduced my task size to 370Kb and i can't do better than that. Can you tell me how to increase the heap size on google cloud ?Ibi_Bo

1 Answers

0
votes

try to increase the driver heap by "--driver-memory XXXXm"