1
votes

I am new to DataBricks. My task is is to read a number of large CSV files (upto 1 gig in size) and validate and clean all fields ready for a polybase read into an Azure DW. The files are stored in blob.

I thought that DatBricks and Python would be an approach that would yield sensible performance.

I have used the example QuickStart shown below as a starting point: https://docs.microsoft.com/en-us/azure/azure-databricks/quickstart-create-databricks-workspace-portal

I want to perform a number of cleaning replacements on every field and also run a Regex, to filter out any other unwanted characters and finally trim to remove trailing spaces. I have included a test example snippet below which gives a flavour of the type of validation I wish to perform. This example uses a udf to translate a value, and then regex to filter unwanted characters, on the example shown in the link.

import pyspark.sql.functions as f

def udf_clean (s):
  return (f.translate(s,'3','B'))

df.filter(df.category=='Housing').select(df[1],f.trim(f.regexp_replace(udf_clean(df[1]),'(\d+)',''))).show()

What I cant find out is how I can perform these translations on the whole dataframe. I would like to cleans the whole dataframe in one pass. As it is vector based I feel I shouldn't have to iterate through it row at a time , but just perform some sort of operation on the whole. I understand how to iterate a row, as in

`for row in df.rdd.collect():
     do_something(row)`

..but I feel I should be able to do something more efficiently for the whole set of fields. Is this thinking correct, and has anyone got any examples please? Many Thanks, Richard

Resultant code but not the answer

I have not found an answer to this question , but I though I would post my code which as you will see is not elegant but works .

from pyspark.sql import functions as f
from pyspark.sql.functions import regexp_replace, udffrom pyspark.sql.functions import translate, udf

from pyspark.sql.functions import trim, udf
from pyspark.sql.functions import lit
from pyspark.sql.types import StringType
from pyspark.sql.functions import UserDefinedFunction


def udf_regexclean(s):
  return trim(regexp_replace(s,'([^\p{L}\p{Nd} ''@.():_*\-&+\/,])',''))

def udf_regexReplace(s):
  return regexp_replace(regexp_replace(regexp_replace(regexp_replace(regexp_replace(s,'£','GBR'),'’',''),'  ',''),"'t",''),'É', 'E')


df1=df.select( udf_regexclean(udf_regexReplace(df[0 ]))
,udf_regexclean(udf_regexReplace(df[1   ]))
,udf_regexclean(udf_regexReplace(df[2   ]))
,udf_regexclean(udf_regexReplace(df[3   ]))
,udf_regexclean(udf_regexReplace(df[4   ]))
,udf_regexclean(udf_regexReplace(df[5   ]))
,udf_regexclean(udf_regexReplace(df[6   ]))
,udf_regexclean(udf_regexReplace(df[7   ]))
,udf_regexclean(udf_regexReplace(df[8   ]))
,udf_regexclean(udf_regexReplace(df[9   ]))
,udf_regexclean(udf_regexReplace(df[10  ]))
,udf_regexclean(udf_regexReplace(df[11  ]))
,udf_regexclean(udf_regexReplace(df[12  ]))
,udf_regexclean(udf_regexReplace(df[13  ]))
,udf_regexclean(udf_regexReplace(df[14  ]))
,udf_regexclean(udf_regexReplace(df[15  ]))
,udf_regexclean(udf_regexReplace(df[16  ]))
,udf_regexclean(udf_regexReplace(df[17  ]))
,udf_regexclean(udf_regexReplace(df[18  ]))
,udf_regexclean(udf_regexReplace(df[19  ]))
,udf_regexclean(udf_regexReplace(df[20  ]))
,udf_regexclean(udf_regexReplace(df[21  ]))
,udf_regexclean(udf_regexReplace(df[22  ]))
,udf_regexclean(udf_regexReplace(df[23  ]))
,udf_regexclean(udf_regexReplace(df[24  ]))
,udf_regexclean(udf_regexReplace(df[25  ]))
,udf_regexclean(udf_regexReplace(df[26  ]))
,udf_regexclean(udf_regexReplace(df[27  ]))
,udf_regexclean(udf_regexReplace(df[28  ]))
,udf_regexclean(udf_regexReplace(df[29  ]))
,udf_regexclean(udf_regexReplace(df[30  ]))
,udf_regexclean(udf_regexReplace(df[31  ]))
,udf_regexclean(udf_regexReplace(df[32  ]))
             )
df2=df1.withColumn('ScrapedFilename',lit(blob_filename))

Richard

1

1 Answers

1
votes

As sample, I created a simple sample to realize your needs, not to replace, just to create a new dataframe via apply an UDF function on the RDD of an old dataframe.

First I created a simple dataframe as the code and figure below.

import numpy as np
import pandas as pd
dates = pd.date_range('20130101', periods=6)
df = pd.DataFrame(np.random.randn(6, 4), index=dates, columns=list('ABCD'))
sparkDF=spark.createDataFrame(df)
display(sparkDF)

enter image description here

Then, to define a function for each row, as the code and figure below.

def udf_clean(row):
  return (row[0] > 0 and True or False, row[1]+2, row[2]*2, row[3]**4)
new_rdd = sparkDF.rdd.map(lambda row: udf_clean(row))
new_sparkDF = spark.createDataFrame(new_rdd, list('ABCD'))
display(new_sparkDF)

enter image description here

Hope it helps.