2
votes

I wrote some code to read a parquet file, switch the schema slightly and write the data to a new parquet file. The code looks as follows:

...
val schema = StructType(
  List(
    StructField("id", LongType, false),
    StructField("data", ArrayType(FloatType), false)
  )
)

val data = sqlContext.read.parquet(file.getAbsolutePath)
val revisedData = data.map(r =>  Row(r.getInt(0).toLong, r.getSeq[Float](1)))
val df = sqlContext.createDataFrame(revisedData,  schema)

Writer.writeToParquet(df)

with Writer being

object Writer {
    def writeToParquet(df : DataFrame) : Unit = {
       val future = Future {
         df.write.mode(SaveMode.Append).save(path)
       }

       Await.ready(future, Duration.Inf)
    }
}

For a file of about 4 GB my program breaks, raising an OutOfMemoryError: Java heap space. I have set 6 GB of memory to the executor (using -Dspark.executor.memory=6g), raised the JVM heap space (using -Xmx6g), increased the Kryo serializer buffer to 2 GB (using System.setProperty("spark.kryoserializer.buffer.mb", "2048")). However, I still get the error.

This is the stack trace:

java.lang.OutOfMemoryError: Java heap space
  at com.esotericsoftware.kryo.io.Output.<init>(Output.java:35)
  at org.apache.spark.serializer.KryoSerializer.newKryoOutput(KryoSerializer.scala:76)
  at org.apache.spark.serializer.KryoSerializerInstance.output$lzycompute(KryoSerializer.scala:243)
  at org.apache.spark.serializer.KryoSerializerInstance.output(KryoSerializer.scala:243)
  at org.apache.spark.serializer.KryoSerializerInstance.serialize(KryoSerializer.scala:247)
  at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:236)
  at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
  at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
  at java.lang.Thread.run(Thread.java:744)

What can I do to avoid this error?

2
first of all you're not using revisedData anywhere. Second of all where are you getting the OOM exactly? Lastly what's the structure of the file (how many columns)? - Mateusz Dymczyk
Sorry, I have corrected the bug in the code. The OOM is raised at com.esotericsoftware.kryo.io.Output.<init>(Output.java:35) (see updated post with stack trace). The original file has two columns, i.e. an Int and a Seq[Float] column - navige
which versin of Spark are you using? make sure to use the appropriate kryoserializer buffer property (1.4.1 doesn't have the mb one). I would try decreasing the buffer size, 2GB is way too much. It needs to be big enough for your data though, so check how long your records are and try going with the smallest possible buffer. Looks like that spark version is using kryo-2.22 which basically tries to do buffer = new byte[bufferSize]; but doesn't have that much space. - Mateusz Dymczyk
Yes, now it works! I was using the wrong buffer property from an old version of Spark. Switched the properties and now it runs smoothly. Thanks! - navige
@naivge ok added it as an answer then - Mateusz Dymczyk

2 Answers

4
votes

Following my comment, two things:

1) You need to watch out with the spark.kryoserializer.buffer.mb property name, in the newest spark they changed it to spark.kryoserializer.buffer and spark.kryoserializer.buffer.max.

2) You have to be careful with the size of the buffer and your heap size, it has to be big enough to store a single record you are writing but not much more as kryo is creating an explicit byte[] of that size (and allocating a single byte array for 2GB is usually a bad idea). Try lowering your buffer size with the proper property.

0
votes

Using sparklyr, having the same OutOfMemoryError, despite reducing spark.kryoserializer.buffer, not beeing able to read a parquet a file I had been able to write, my solution was to:

disable the "eager" memory load option: (memory=FALSE)

spark_read_parquet(sc,name=curName,file.path("file://",srcFile), header=true, memory=FALSE)

spark 2.3.0 sparklyr 1.0.0 R version 3.4.2