My application works pretty good on large datasets of tiny objects. But with growing objects operation on dataframe run into out-of-memory exception. Basically application reads a sequence file and transforms key/value to row objects with some columns, does some sorting and finally converts back to rdd to write a sequence file.
If row object is tiny - about 20kbytes of data - all is fine. In a second run row objects contains about 2mb of data and spark runs into out of memory issues. I testet several options, changing partition size and count, but application does not run stable.
To reproduce this issue, I created following example code. An rdd of 10000 int-objects is mapped to an String of 2mb lengths (probaby 4mb assuming 16bit per char). 10.000 times 2mb is about 20gb of data. Operation show() sorting and show() again works as expected. Logs indicate spark is storing to disk. But when writing result to csv file, hadoop file or just calling toJavaRDD() the applications ends with out-of-memory: java heap space Seems spark is holding data in memory only.
// create spark session
SparkSession spark = SparkSession.builder()
.appName("example1")
.master("local[2]")
.config("spark.driver.maxResultSize","0")
.getOrCreate();
JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext());
// base to generate huge data
List<Integer> list = new ArrayList<>();
for (int val = 1; val < 10000; val++) {
int valueOf = Integer.valueOf(val);
list.add(valueOf);
}
// create simple rdd of int
JavaRDD<Integer> rdd = sc.parallelize(list,200);
// use map to create large object per row
JavaRDD<Row> rowRDD =
rdd.map(value -> RowFactory.create(String.valueOf(value), createLongText(UUID.randomUUID().toString(), 2 * 1024 * 1024)))
;
StructType type = new StructType();
type = type
.add("c1", DataTypes.StringType)
.add( "c2", DataTypes.StringType );
Dataset<Row> df = spark.createDataFrame(rowRDD, type);
// works
df.show();
df = df.sort( col("c2").asc() );
// takes a lot of time but works
df.show();
// OutOfMemoryError: java heap space
df.write().csv("d:/temp/my.csv");
// OutOfMemoryError: java heap space
df
.toJavaRDD()
.mapToPair(row -> new Tuple2(new Text(row.getString(0)), new Text( row.getString(1))))
.saveAsHadoopFile("d:\\temp\\foo", Text.class, Text.class, SequenceFileOutputFormat.class );
Opertion createLongText() creates a long string.
private static String createLongText( String text, int minLength ) {
String longText = text;
while( longText.length() < minLength ) {
longText = longText + longText;
}
return longText;
}
JVM runs with 4GB of heap (-Xms4g -Xmx4g) in Java 1.8u171 with default GC (GC1) and optional with -XX:+UseParallelGC in local mode. Heap size increases towards upper limit and gc activity raises. In most cases oom crashed the driver, in some cases high gc-activity causes timeouts to driver. It is the first time that spark.driver.maxResultSize was set to 0 (unlimited). When working with smaller objects (nearly same amount of data with more objects of less data) any worked pretty. Seems any conversion from dataFrame to rdd binds many memory resources and forces all results send back to the driver. I created an heap dump just before crashing and see an large scala array of array. Inner array contains about 32mb (partition size!?) and sum of inner array is about number of partitions (115)
8/07/26 21:58:31 INFO MemoryStore: Block taskresult_144 stored as bytes in memory (estimated size 47.7 MB, free 86.3 MB)
18/07/26 21:58:31 INFO BlockManagerInfo: Added taskresult_144 in memory on blackhawk:61185 (size: 47.7 MB, free: 86.4 MB)
18/07/26 21:58:31 INFO Executor: Finished task 143.0 in stage 1.0 (TID 144). 50033768 bytes result sent via BlockManager)
18/07/26 21:58:31 INFO TaskSetManager: Starting task 145.0 in stage 1.0 (TID 146, localhost, executor driver, partition 145, PROCESS_LOCAL, 8355 bytes)
18/07/26 21:58:31 INFO Executor: Running task 145.0 in stage 1.0 (TID 146)
18/07/26 21:58:35 INFO TaskSetManager: Finished task 104.0 in stage 1.0 (TID 105) in 36333 ms on localhost (executor driver) (105/200)
18/07/26 21:58:35 ERROR Executor: Exception in task 144.0 in stage 1.0 (TID 145)
java.lang.OutOfMemoryError: Java heap space
at java.util.Arrays.copyOf(Arrays.java:3236)
at java.lang.StringCoding.safeTrim(StringCoding.java:79)
at java.lang.StringCoding.encode(StringCoding.java:365)
at java.lang.String.getBytes(String.java:941)
at org.apache.spark.unsafe.types.UTF8String.fromString(UTF8String.java:141)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.StaticInvoke_1$(Unknown Source)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply(Unknown Source)
at org.apache.spark.sql.catalyst.encoders.ExpressionEncoder.toRow(ExpressionEncoder.scala:288)
at org.apache.spark.sql.SparkSession$$anonfun$4.apply(SparkSession.scala:592)
at org.apache.spark.sql.SparkSession$$anonfun$4.apply(SparkSession.scala:592)
Any idea what is wrong here? Or is it a bug in spark 2.3.1?