0
votes

I'm creating a DF by reading a csv file in Pyspark and then converting into RDD to apply UDF. It throws an error while applying the UDF.

Here's my code snippet -

# My UDF definition
def my_udf(string_array):
    // some code //
    return float_var

spark.udf.register("my_udf", my_udf, FloatType())

#Read from csv file
read_data=spark.read.format("csv").load("/path/to/file/part-*.csv", header="true")

rdd = read_data.rdd

get_df = rdd.map(lambda x: (x[0], x[1], my_udf(x[2]))).toDF(["col1", "col2","col3"])

Sample data in read_data DF -

[Row(Id='ABCD505936', some_string='XCOYNZGAE', array='[0, 2, 5, 6, 8, 10, 12, 13, 14, 15]')]

The schema of the DF created by reading from CSV file -

print (read_data.schema)
StructType(List(StructField(col1,StringType,true),StructField(col2,StringType,true),StructField(col3,StringType,true)))

I get following error while applying UDF at the get_df line -

Traceback (most recent call last): File "", line 1, in File "/usr/lib/spark/python/pyspark/sql/session.py", line 58, in toDF return sparkSession.createDataFrame(self, schema, sampleRatio) File "/usr/lib/spark/python/pyspark/sql/session.py", line 746, in createDataFrame rdd, schema = self._createFromRDD(data.map(prepare), schema, samplingRatio) File "/usr/lib/spark/python/pyspark/sql/session.py", line 390, in _createFromRDD struct = self._inferSchema(rdd, samplingRatio, names=schema) File "/usr/lib/spark/python/pyspark/sql/session.py", line 377, in _inferSchema raise ValueError("Some of types cannot be determined by the " ValueError: Some of types cannot be determined by the first 100 rows, please try again with sampling

Can anyone please help me in passing the array (datatype is string) to the UDF?

1
Why are you using an RDD instead of the dataframe?OneCricketeer
Hi @cricket_007, I admit that I'm new to Pyspark and somebody advised me that when applying UDF we should convert DF to RDD so that we can use map which make the execution parallel instead of serial when using DF. Please correct me if this understanding is not correct, it'll be really helpful for me.sopana
UDFs are a SparkSQL concept only to be used in DataFrames. In RDD terms, they are just methods.OneCricketeer
Hi @cricket_007, So there is nothing like parallel execution of the UDF (method) with "map" function of RDD.sopana
map() is a method of both dataframes and RDDs. Both operate in parallel. In order to use a UDF, you would go through the SparkSQL API - docs.databricks.com/spark/latest/spark-sql/udf-python.htmlOneCricketeer

1 Answers

0
votes

Two things:

  1. if convert DF to RDD you don't need to register my_udf as a udf. If you register udf, you directly apply to df like read_data.withColumn("col3", my_udf(F.col("col3")))

  2. the problem you encountered is at toDF step, that you dont specify the schema of the new DF when converted from RDD and spark is trying to infer type from sample data, but in your case, the implicit type hint is not working. I will manually create the schema and pass into the toDF like this

from pyspark.sql.types import StringType, FloatType, StructField, StructType
get_schema = StructType(
[StructField('col1', StringType(), True),
 StructField('col2', StringType(), True)
 StructField('col3', FloatType(), True)]
)
get_df = rdd.map(lambda x: (x[0], x[1], my_udf(x[2]))).toDF(get_schema)