I am new to DataBricks. My task is is to read a number of large CSV files (upto 1 gig in size) and validate and clean all fields ready for a polybase read into an Azure DW. The files are stored in blob.
I thought that DatBricks and Python would be an approach that would yield sensible performance.
I have used the example QuickStart shown below as a starting point: https://docs.microsoft.com/en-us/azure/azure-databricks/quickstart-create-databricks-workspace-portal
I want to perform a number of cleaning replacements on every field and also run a Regex, to filter out any other unwanted characters and finally trim to remove trailing spaces. I have included a test example snippet below which gives a flavour of the type of validation I wish to perform. This example uses a udf to translate a value, and then regex to filter unwanted characters, on the example shown in the link.
import pyspark.sql.functions as f
def udf_clean (s):
return (f.translate(s,'3','B'))
df.filter(df.category=='Housing').select(df[1],f.trim(f.regexp_replace(udf_clean(df[1]),'(\d+)',''))).show()
What I cant find out is how I can perform these translations on the whole dataframe. I would like to cleans the whole dataframe in one pass. As it is vector based I feel I shouldn't have to iterate through it row at a time , but just perform some sort of operation on the whole. I understand how to iterate a row, as in
`for row in df.rdd.collect():
do_something(row)`
..but I feel I should be able to do something more efficiently for the whole set of fields. Is this thinking correct, and has anyone got any examples please? Many Thanks, Richard
Resultant code but not the answer
I have not found an answer to this question , but I though I would post my code which as you will see is not elegant but works .
from pyspark.sql import functions as f
from pyspark.sql.functions import regexp_replace, udffrom pyspark.sql.functions import translate, udf
from pyspark.sql.functions import trim, udf
from pyspark.sql.functions import lit
from pyspark.sql.types import StringType
from pyspark.sql.functions import UserDefinedFunction
def udf_regexclean(s):
return trim(regexp_replace(s,'([^\p{L}\p{Nd} ''@.():_*\-&+\/,])',''))
def udf_regexReplace(s):
return regexp_replace(regexp_replace(regexp_replace(regexp_replace(regexp_replace(s,'£','GBR'),'’',''),' ',''),"'t",''),'É', 'E')
df1=df.select( udf_regexclean(udf_regexReplace(df[0 ]))
,udf_regexclean(udf_regexReplace(df[1 ]))
,udf_regexclean(udf_regexReplace(df[2 ]))
,udf_regexclean(udf_regexReplace(df[3 ]))
,udf_regexclean(udf_regexReplace(df[4 ]))
,udf_regexclean(udf_regexReplace(df[5 ]))
,udf_regexclean(udf_regexReplace(df[6 ]))
,udf_regexclean(udf_regexReplace(df[7 ]))
,udf_regexclean(udf_regexReplace(df[8 ]))
,udf_regexclean(udf_regexReplace(df[9 ]))
,udf_regexclean(udf_regexReplace(df[10 ]))
,udf_regexclean(udf_regexReplace(df[11 ]))
,udf_regexclean(udf_regexReplace(df[12 ]))
,udf_regexclean(udf_regexReplace(df[13 ]))
,udf_regexclean(udf_regexReplace(df[14 ]))
,udf_regexclean(udf_regexReplace(df[15 ]))
,udf_regexclean(udf_regexReplace(df[16 ]))
,udf_regexclean(udf_regexReplace(df[17 ]))
,udf_regexclean(udf_regexReplace(df[18 ]))
,udf_regexclean(udf_regexReplace(df[19 ]))
,udf_regexclean(udf_regexReplace(df[20 ]))
,udf_regexclean(udf_regexReplace(df[21 ]))
,udf_regexclean(udf_regexReplace(df[22 ]))
,udf_regexclean(udf_regexReplace(df[23 ]))
,udf_regexclean(udf_regexReplace(df[24 ]))
,udf_regexclean(udf_regexReplace(df[25 ]))
,udf_regexclean(udf_regexReplace(df[26 ]))
,udf_regexclean(udf_regexReplace(df[27 ]))
,udf_regexclean(udf_regexReplace(df[28 ]))
,udf_regexclean(udf_regexReplace(df[29 ]))
,udf_regexclean(udf_regexReplace(df[30 ]))
,udf_regexclean(udf_regexReplace(df[31 ]))
,udf_regexclean(udf_regexReplace(df[32 ]))
)
df2=df1.withColumn('ScrapedFilename',lit(blob_filename))
Richard

