I wrote some code to read a parquet file, switch the schema slightly and write the data to a new parquet file. The code looks as follows:
...
val schema = StructType(
List(
StructField("id", LongType, false),
StructField("data", ArrayType(FloatType), false)
)
)
val data = sqlContext.read.parquet(file.getAbsolutePath)
val revisedData = data.map(r => Row(r.getInt(0).toLong, r.getSeq[Float](1)))
val df = sqlContext.createDataFrame(revisedData, schema)
Writer.writeToParquet(df)
with Writer being
object Writer {
def writeToParquet(df : DataFrame) : Unit = {
val future = Future {
df.write.mode(SaveMode.Append).save(path)
}
Await.ready(future, Duration.Inf)
}
}
For a file of about 4 GB my program breaks, raising an OutOfMemoryError: Java heap space. I have set 6 GB of memory to the executor (using -Dspark.executor.memory=6g), raised the JVM heap space (using -Xmx6g), increased the Kryo serializer buffer to 2 GB (using System.setProperty("spark.kryoserializer.buffer.mb", "2048")). However, I still get the error.
This is the stack trace:
java.lang.OutOfMemoryError: Java heap space
at com.esotericsoftware.kryo.io.Output.<init>(Output.java:35)
at org.apache.spark.serializer.KryoSerializer.newKryoOutput(KryoSerializer.scala:76)
at org.apache.spark.serializer.KryoSerializerInstance.output$lzycompute(KryoSerializer.scala:243)
at org.apache.spark.serializer.KryoSerializerInstance.output(KryoSerializer.scala:243)
at org.apache.spark.serializer.KryoSerializerInstance.serialize(KryoSerializer.scala:247)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:236)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:744)
What can I do to avoid this error?
revisedDataanywhere. Second of all where are you getting the OOM exactly? Lastly what's the structure of the file (how many columns)? - Mateusz Dymczykcom.esotericsoftware.kryo.io.Output.<init>(Output.java:35)(see updated post with stack trace). The original file has two columns, i.e. anIntand aSeq[Float]column - navigembone). I would try decreasing the buffer size, 2GB is way too much. It needs to be big enough for your data though, so check how long your records are and try going with the smallest possible buffer. Looks like that spark version is using kryo-2.22 which basically tries to dobuffer = new byte[bufferSize];but doesn't have that much space. - Mateusz Dymczyk