I'm creating a DF by reading a csv file in Pyspark and then converting into RDD to apply UDF. It throws an error while applying the UDF.
Here's my code snippet -
# My UDF definition
def my_udf(string_array):
// some code //
return float_var
spark.udf.register("my_udf", my_udf, FloatType())
#Read from csv file
read_data=spark.read.format("csv").load("/path/to/file/part-*.csv", header="true")
rdd = read_data.rdd
get_df = rdd.map(lambda x: (x[0], x[1], my_udf(x[2]))).toDF(["col1", "col2","col3"])
Sample data in read_data DF -
[Row(Id='ABCD505936', some_string='XCOYNZGAE', array='[0, 2, 5, 6, 8, 10, 12, 13, 14, 15]')]
The schema of the DF created by reading from CSV file -
print (read_data.schema)
StructType(List(StructField(col1,StringType,true),StructField(col2,StringType,true),StructField(col3,StringType,true)))
I get following error while applying UDF at the get_df line -
Traceback (most recent call last): File "", line 1, in File "/usr/lib/spark/python/pyspark/sql/session.py", line 58, in toDF return sparkSession.createDataFrame(self, schema, sampleRatio) File "/usr/lib/spark/python/pyspark/sql/session.py", line 746, in createDataFrame rdd, schema = self._createFromRDD(data.map(prepare), schema, samplingRatio) File "/usr/lib/spark/python/pyspark/sql/session.py", line 390, in _createFromRDD struct = self._inferSchema(rdd, samplingRatio, names=schema) File "/usr/lib/spark/python/pyspark/sql/session.py", line 377, in _inferSchema raise ValueError("Some of types cannot be determined by the " ValueError: Some of types cannot be determined by the first 100 rows, please try again with sampling
Can anyone please help me in passing the array (datatype is string) to the UDF?
map()
is a method of both dataframes and RDDs. Both operate in parallel. In order to use a UDF, you would go through the SparkSQL API - docs.databricks.com/spark/latest/spark-sql/udf-python.html – OneCricketeer