5
votes

I'm using a UDF written in python in order to change the base of a number.

So I read a parquet file and write to a parquet file and applying the UDF. Here is the line I run:

input_df.withColumn("origin_base", convert_2_dest_base(input_df.origin_base)).write.mode('overwrite').parquet(destination_path)

This conversion makes spark to utilize a lot of memory and I get this kind of warnings:

17/06/18 08:05:39 WARN TaskSetManager: Lost task 40.0 in stage 4.0 (TID 183, ip-10-100-5-196.ec2.internal, executor 19): ExecutorLostFailure (executor 19 exited caused by one of the running tasks) Reason: Container killed by YARN for exceeding memory limits. 4.4 GB of 4.4 GB physical memory used. Consider boosting spark.yarn.executor.memoryOverhead.

and in the end it fails.

Does UDF is not the right approach? Why is it consuming so much memory?

2
hard to tell what's te right approach without sharing a fully reproducible example. - mtoto

2 Answers

6
votes

For pyspark, data is processed in Python and cached / shuffled in the JVM. If you are using built in Python API, there would not be much difference in terms of performance to scala. See python vs scala performance

enter image description here

When you use udf, since your local defined function does not registered in native JVM structure and so can't be implemented by simple java API call, it has to be serialize/deserialize to Python worker. Then data will be processed in Python worker and serialize/deserialize back to JVM.

The Python worker now need to process the serialized data in the off-heap memory, it consumes huge off-heap memory and so it often leads to memoryOverhead.

Performance wise, serialization is slow and it is often the key for performance tuning.

0
votes

udf function use serialization and deserialization methods for columns conversion. Thats the reason for use of a lot of memory. You can look at spark functions for alternatives.