I'm using Google cloud platform to make the image proccessing with Spark (2.0.2). When I execute my code (Java), I get this Error :
[Stage 1:> (0 + 0) / 2]17/10/15 13:39:44 WARN org.apache.spark.scheduler.TaskSetManager: Stage 1 contains a task of very large size (165836 KB). The maximum recommended task size is 100 KB.
[Stage 1:> (0 + 1) / 2]Exception in thread "dispatcher-event-loop-1" java.lang.OutOfMemoryError: Java heap space
at java.util.Arrays.copyOf(Arrays.java:3236)
at java.io.ByteArrayOutputStream.grow(ByteArrayOutputStream.java:118)
at java.io.ByteArrayOutputStream.ensureCapacity(ByteArrayOutputStream.java:93) at java.io.ByteArrayOutputStream.write(ByteArrayOutputStream.java:153) at org.apache.spark.util.ByteBufferOutputStream.write(ByteBufferOutputStream.scala:41) at java.io.ObjectOutputStream$BlockDataOutputStream.write(ObjectOutputStream.java:1853) at java.io.ObjectOutputStream.write(ObjectOutputStream.java:709) at java.nio.channels.Channels$WritableByteChannelImpl.write(Channels.java:458) at org.apache.spark.util.SerializableBuffer$$anonfun$writeObject$1.apply(SerializableBuffer.scala:49) at org.apache.spark.util.SerializableBuffer$$anonfun$writeObject$1.apply(SerializableBuffer.scala:47) at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1276) at org.apache.spark.util.SerializableBuffer.writeObject(SerializableBuffer.scala:47) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498)
at java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:1028) at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1496) at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548) at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509) at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348) at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:43) at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:100) at org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend$DriverEndpoint$$anonfun$launchTasks$1.apply(CoarseGrainedSchedulerBackend.scala:250) at org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend$DriverEndpoint$$anonfun$launchTasks$1.apply(CoarseGrainedSchedulerBackend.scala:249) at scala.collection.immutable.List.foreach(List.scala:381) at org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend$DriverEndpoint.launchTasks(CoarseGrainedSchedulerBackend.scala:249) at org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend$DriverEndpoint.org$apache$spark$scheduler$cluster$CoarseGrainedSchedulerBackend$DriverEndpoint$$makeOffers(CoarseGrainedSchedulerBackend.scala:220)
Where and how do I increase Java heap space?
My program :
public static void main(String[] args) {
try{
//Configuration de Spark ....
SparkSession spark = SparkSession
.builder()
.appName("Features")
.getOrCreate();
JavaSparkContext jsc = new JavaSparkContext(spark.sparkContext());
//Configuration HBase ....
String tableName = "Descripteurs";
Configuration conf = HBaseConfiguration.create();
conf.addResource(new Path("/home/ibtissam/hbase-1.2.5/conf/hbase-site.xml"));
conf.addResource(new Path("/home/ibtissam/hbase-1.2.5/conf/core-site.xml"));
conf.set(TableInputFormat.INPUT_TABLE, tableName);
Connection connection = ConnectionFactory.createConnection(conf);
Admin admin = connection.getAdmin();
Table tab = connection.getTable(TableName.valueOf(tableName));
for (int n=0; n<10; n++) {
List<String> images =new ArrayList<>();
String repertory_ = "/home/ibtissam/images-test-10000/images-"+n+"/";
File repertory = new File(repertory_);
String files[] = repertory.list();
for(int k=0; k<10;k++){
ExecutorService executorService = Executors.newCachedThreadPool();
List<MyRunnable> runnableList = new ArrayList<>();
for(int i=k*100; i<(k+1)*100 ; i++){
MyRunnable runnable = new MyRunnable(repertory_+files[i]);
runnableList.add(runnable);
executorService.execute(runnable);
}
executorService.shutdown();
while(!executorService.isTerminated()){}
for (int i=0; i<runnableList.size(); i++) {
images.add(runnableList.get(i).descripteurs_);
}
}
JavaRDD<String> rdd = jsc.parallelize(images, 2000);
//Calcul des descripteurs
JavaPairRDD<String,String> rdd_final = rdd.mapToPair(new PairFunction<String,String,String>() {
@Override
public Tuple2<String,String> call(String value) {
String strTab[] = value.split(",");
int h = Integer.parseInt(strTab[1]);
int w = Integer.parseInt(strTab[2]);
String type = strTab[3];
String nom = strTab[0];
String key = nom+"-"+h+"-"+w+"-"+type;
// Conversion de String >> Mat
Mat image = new Mat(h, w, 16);
UByteRawIndexer idx = image.createIndexer();
int indice = 4;
for (int i =0;i<h;i++) {
for (int j=0;j<w;j++) {
idx.put(i, j, Integer.parseInt(strTab[indice]));
indice = indice++;
}
}
// Calcul des features
SIFT sift = new SIFT().create();
KeyPointVector keypoints = new KeyPointVector();
Mat descriptors = new Mat();
image.convertTo(image, CV_8UC3);
sift.detect(image, keypoints);
KeyPointVector keypoints_sorted = new KeyPointVector();
keypoints_sorted = sort(keypoints);
KeyPointVector keypoints_2 = new KeyPointVector((keypoints_sorted.size())/4);
for (int k = 0; k < (keypoints_sorted.size())/4; k++){
keypoints_2.put(k, keypoints_sorted.get(k));
}
sift.compute(image,keypoints_2,descriptors);
image.release();
int hDes = descriptors.size().height();
int wDes = descriptors.size().width();
key = key +"-"+hDes+"-"+wDes+"-"+descriptors.type();
while(hDes ==0 | wDes==0){
SIFT sift_ = new SIFT().create();
KeyPointVector keypoints_ = new KeyPointVector();
sift.detect(image, keypoints_);
KeyPointVector keypoints_sorted_ = new KeyPointVector();
keypoints_sorted_ = sort(keypoints_);
KeyPointVector keypoints_2_ = new KeyPointVector((keypoints_sorted_.size())/4);
for (int k = 0; k < (keypoints_sorted_.size())/4; k++){
keypoints_2_.put(k, keypoints_sorted_.get(k));
}
sift_.compute(image,keypoints_2_,descriptors);
}
// Converion des features => String
String featuresStr = new String("");
FloatRawIndexer idx_ = descriptors.createIndexer();
int position =0;
for (int i =0;i < descriptors.size().height();i++) {
for (int j =0;j < descriptors.size().width();j++) {
if (position == 0) {
featuresStr = String.valueOf(idx_.get(position))+",";
}
if (position == ((descriptors.size().height()*descriptors.size().width())-1) ){
featuresStr = featuresStr + String.valueOf(idx_.get(position));
}else{
featuresStr = featuresStr + String.valueOf(idx_.get(position))+",";
}
position++;
}
}
descriptors.release();
Tuple2<String, String> tuple = new Tuple2<>(key, featuresStr);
return tuple;
}
});
System.out.println("Fin de calcul des descripteurs .... ");
List<Tuple2<String,String>> liste = rdd_final.collect();
System.out.println("Insertion dans hbase .... \n");
for (int b=0; b<liste.size(); b++) {
String metadata[] = liste.get(b)._1().split("-");
String data = liste.get(b)._2();
// Row
byte [] row = Bytes.toBytes(liste.get(b)._1());
// Family
byte [] family1 = Bytes.toBytes("Metadata");
byte [] family2 = Bytes.toBytes("Data");
// Qualifiers
byte [] height = Bytes.toBytes("height");
byte [] width = Bytes.toBytes("width");
byte [] colorSpace = Bytes.toBytes("colorSpace");
byte [] name = Bytes.toBytes("name");
byte [] features = Bytes.toBytes("features");
// Create Put
Put put = new Put(row);
put.addColumn(family1, height, Bytes.toBytes(metadata[5]));
put.addColumn(family1, width, Bytes.toBytes(metadata[6]));
put.addColumn(family1, name, Bytes.toBytes(metadata[0]+"-"+metadata[1]+"-"+metadata[2]+"-"+metadata[3]));
put.addColumn(family1, colorSpace, Bytes.toBytes(metadata[4]));
put.addColumn(family2, features, Bytes.toBytes(liste.get(b)._2()));
tab.put(put);
}
}
jsc.close();
}catch(Exception e){
System.out.println(e);
}
}
broadcast
could help here as there's a lookup table used inside (or something very similar). – Jacek Laskowski