IIUC, you can try the following steps without using UDF:
from pyspark.sql.functions import expr, first, collect_list, broadcast, monotonically_increasing_id, flatten
import pandas as pd
df = spark.createDataFrame([
["3030 Whispering Pines Circle, Prosper Texas, US","John"],
["Kalverstraat Amsterdam","Mary"],
["Kalverstraat Amsterdam, Netherlands","Lex"],
["xvcv", "ddd"]
]).toDF("address","name")
Step-1: convert df_regex to a Spark dataframe df1
and add an unique_id to df
.
df_regex = pd.read_csv("file:///path/to/regex.csv", sep=";")
# adjust keywords to uppercase except chars preceded with backslash:
df_regex["keywords"] = df_regex["keywords"].str.replace(r'(^|\\.)([^\\]*)', lambda m: m.group(1) + m.group(2).upper())
# create regex patterns:
df_regex = df_regex.groupby('iso2').agg({'keywords':lambda x: '(?m)' + '|'.join(x)}).reset_index()
df1 = spark.createDataFrame(df_regex)
df1.show(truncate=False)
+----+---------------------------------------------------------------------------------+
|iso2|keywords |
+----+---------------------------------------------------------------------------------+
|CA |(?m)\bALBERTA\b|\bNOVA SCOTIA\b|\bWHITEHORSE\b|\bCA$ |
|NL |(?m)\bAMSTERDAM\b|\bNETHERLANDS\b|\bNL$ |
|US |(?m)\bARIZONA\b|\bTEXAS\b|\bFLORIDA\b|\bCHICAGO\b|\bAMSTERDAM\b|\bPROSPER\b|\bUS$|
+----+---------------------------------------------------------------------------------+
df = df.withColumn('id', monotonically_increasing_id())
df.show(truncate=False)
+-----------------------------------------------+----+---+
|address |name|id |
+-----------------------------------------------+----+---+
|3030 Whispering Pines Circle, Prosper Texas, US|John|0 |
|Kalverstraat Amsterdam |Mary|1 |
|Kalverstraat Amsterdam, Netherlands |Lex |2 |
|xvcv |ddd |3 |
+-----------------------------------------------+----+---+
Step-2: left join df_regex to df using rlike
df2 = df.alias('d1').join(broadcast(df1.alias('d2')), expr("upper(d1.address) rlike d2.keywords"), "left")
df2.show()
+--------------------+----+---+----+--------------------+
| address|name| id|iso2| keywords|
+--------------------+----+---+----+--------------------+
|3030 Whispering P...|John| 0| US|(?m)\bARIZONA\b|\...|
|Kalverstraat Amst...|Mary| 1| NL|(?m)\bAMSTERDAM\b...|
|Kalverstraat Amst...|Mary| 1| US|(?m)\bARIZONA\b|\...|
|Kalverstraat Amst...| Lex| 2| NL|(?m)\bAMSTERDAM\b...|
|Kalverstraat Amst...| Lex| 2| US|(?m)\bARIZONA\b|\...|
| xvcv| ddd| 3|null| null|
+--------------------+----+---+----+--------------------+
Step-3: count number of matched d2.keywords
in d1.address
by splitting d1.address
by d2.keywords
, and then reduce the size of the resulting Array by 1:
df3 = df2.withColumn('num_matches', expr("size(split(upper(d1.address), d2.keywords))-1"))
+--------------------+----+---+----+--------------------+-----------+
| address|name| id|iso2| keywords|num_matches|
+--------------------+----+---+----+--------------------+-----------+
|3030 Whispering P...|John| 0| US|(?m)\bARIZONA\b|\...| 3|
|Kalverstraat Amst...|Mary| 1| NL|(?m)\bAMSTERDAM\b...| 1|
|Kalverstraat Amst...|Mary| 1| US|(?m)\bARIZONA\b|\...| 1|
|Kalverstraat Amst...| Lex| 2| NL|(?m)\bAMSTERDAM\b...| 2|
|Kalverstraat Amst...| Lex| 2| US|(?m)\bARIZONA\b|\...| 1|
| xvcv| ddd| 3|null| null| -2|
+--------------------+----+---+----+--------------------+-----------+
Step-4: use array_repeat to repeat the value of iso2
num_matches
times (require Spark 2.4+):
df4 = df3.withColumn("iso2", expr("array_repeat(iso2, num_matches)"))
+--------------------+----+---+------------+--------------------+-----------+
| address|name| id| iso2| keywords|num_matches|
+--------------------+----+---+------------+--------------------+-----------+
|3030 Whispering P...|John| 0|[US, US, US]|(?m)\bARIZONA\b|\...| 3|
|Kalverstraat Amst...|Mary| 1| [NL]|(?m)\bAMSTERDAM\b...| 1|
|Kalverstraat Amst...|Mary| 1| [US]|(?m)\bARIZONA\b|\...| 1|
|Kalverstraat Amst...| Lex| 2| [NL, NL]|(?m)\bAMSTERDAM\b...| 2|
|Kalverstraat Amst...| Lex| 2| [US]|(?m)\bARIZONA\b|\...| 1|
| xvcv| ddd| 3| []| null| -2|
+--------------------+----+---+------------+--------------------+-----------+
Step-5: groupby and do the aggregation:
df_new = df4 \
.groupby('id') \
.agg(
first('address').alias('address'),
first('name').alias('name'),
flatten(collect_list('iso2')).alias('countries')
)
+---+--------------------+----+------------+
| id| address|name| countries|
+---+--------------------+----+------------+
| 0|3030 Whispering P...|John|[US, US, US]|
| 1|Kalverstraat Amst...|Mary| [NL, US]|
| 3| xvcv| ddd| []|
| 2|Kalverstraat Amst...| Lex|[NL, NL, US]|
+---+--------------------+----+------------+
Alternative: Step-3 can also be handled by Pandas UDF:
from pyspark.sql.functions import pandas_udf, PandasUDFType
from pandas import Series
import re
@pandas_udf("int", PandasUDFType.SCALAR)
def get_num_matches(addr, ptn):
return Series([ 0 if p is None else len(re.findall(p,s)) for p,s in zip(ptn,addr) ])
df3 = df2.withColumn("num_matches", get_num_matches(expr('upper(address)'), 'keywords'))
+--------------------+----+---+----+--------------------+-----------+
| address|name| id|iso2| keywords|num_matches|
+--------------------+----+---+----+--------------------+-----------+
|3030 Whispering P...|John| 0| US|(?m)\bARIZONA\b|\...| 3|
|Kalverstraat Amst...|Mary| 1| NL|(?m)\bAMSTERDAM\b...| 1|
|Kalverstraat Amst...|Mary| 1| US|(?m)\bARIZONA\b|\...| 1|
|Kalverstraat Amst...| Lex| 2| NL|(?m)\bAMSTERDAM\b...| 2|
|Kalverstraat Amst...| Lex| 2| US|(?m)\bARIZONA\b|\...| 1|
| xvcv| ddd| 3|null| null| 0|
+--------------------+----+---+----+--------------------+-----------+
Notes:
- as pattern-matching with case-insensitive is expensive, we converted all chars of keywords (except anchors or escaped chars like
\b
, \B
, \A
, \z
) to upper case.
- just a reminder, patterns used in
rlike
and regexp_replace
are Java-based while in pandas_udf it's Python-based which might have slight differences when setting up patterns in regex.csv.
Method-2: using pandas_udf
As using join and groupby triggers data shuffling, the above method could be slow. Just one more option for your testing:
df_regex = pd.read_csv("file:///path/to/regex.csv", sep=";")
df_regex["keywords"] = df_regex["keywords"].str.replace(r'(^|\\.)([^\\]*)', lambda m: m.group(1) + m.group(2).upper())
df_ptn = spark.sparkContext.broadcast(
df_regex.groupby('iso2').agg({'keywords':lambda x: '(?m)' + '|'.join(x)})["keywords"].to_dict()
)
df_ptn.value
#{'CA': '(?m)\\bALBERTA\\b|\\bNOVA SCOTIA\\b|\\bNOVA SCOTIA\\b|\\bWHITEHORSE\\b|\\bCA$',
# 'NL': '(?m)\\bAMSTERDAM\\b|\\bNETHERLANDS\\b|\\bNL$',
# 'US': '(?m)\\bARIZONA\\b|\\bTEXAS\\b|\\bFLORIDA\\b|\\bCHICAGO\\b|\\bAMSTERDAM\\b|\\bPROSPER\\b|\\bUS$'}
# REF: https://stackguides.com/questions/952914/how-to-make-a-flat-list-out-of-list-of-lists
from operator import iconcat
from functools import reduce
from pandas import Series
from pyspark.sql.functions import pandas_udf, PandasUDFType, flatten
def __get_iso2(addr, ptn):
return Series([ reduce(iconcat, [[k]*len(re.findall(v,s)) for k,v in ptn.value.items()]) for s in addr ])
get_iso2 = pandas_udf(lambda x:__get_iso2(x, df_ptn), "array<string>", PandasUDFType.SCALAR)
df.withColumn('iso2', get_iso2(expr("upper(address)"))).show()
+--------------------+----+---+------------+
| address|name| id| iso2|
+--------------------+----+---+------------+
|3030 Whispering P...|John| 0|[US, US, US]|
|Kalverstraat Amst...|Mary| 1| [NL, US]|
|Kalverstraat Amst...| Lex| 2|[NL, NL, US]|
| xvcv| ddd| 3| []|
+--------------------+----+---+------------+
Or return an array of arrays in pandas_udf (w/o reduce
and iconcat
) and do flatten
with Spark:
def __get_iso2_2(addr, ptn):
return Series([ [[k]*len(re.findall(v,s)) for k,v in ptn.value.items()] for s in addr ])
get_iso2_2 = pandas_udf(lambda x:__get_iso2_2(x, df_ptn), "array<array<string>>", PandasUDFType.SCALAR)
df.withColumn('iso2', flatten(get_iso2_2(expr("upper(address)")))).show()
Update: to find unique countries, do the following:
def __get_iso2_3(addr, ptn):
return Series([ [k for k,v in ptn.value.items() if re.search(v,s)] for s in addr ])
get_iso2_3 = pandas_udf(lambda x:__get_iso2_3(x, df_ptn), "array<string>", PandasUDFType.SCALAR)
df.withColumn('iso2', get_iso2_3(expr("upper(address)"))).show()
+--------------------+----+--------+
| address|name| iso2|
+--------------------+----+--------+
|3030 Whispering P...|John| [US]|
|Kalverstraat Amst...|Mary|[NL, US]|
|Kalverstraat Amst...| Lex|[NL, US]|
| xvcv| ddd| []|
+--------------------+----+--------+
Method-3: use a list comprehension:
Similar to @CronosNull's method, In case the list of regex.csv is manageable, you can handle this using a list comprehension:
from pyspark.sql.functions import size, split, upper, col, array, expr, flatten
df_regex = pd.read_csv("file:///path/to/regex.csv", sep=";")
df_regex["keywords"] = df_regex["keywords"].str.replace(r'(^|\\.)([^\\]*)', lambda m: m.group(1) + m.group(2).upper())
df_ptn = df_regex.groupby('iso2').agg({'keywords':lambda x: '(?m)' + '|'.join(x)})["keywords"].to_dict()
df1 = df.select("*", *[ (size(split(upper(col('address')), v))-1).alias(k) for k,v in df_ptn.items()])
df1.select(*df.columns, flatten(array(*[ expr("array_repeat('{0}',`{0}`)".format(c)) for c in df_ptn.keys() ])).alias('iso2')).show()
+--------------------+----+---+------------+
| address|name| id| iso2|
+--------------------+----+---+------------+
|3030 Whispering P...|John| 0|[US, US, US]|
|Kalverstraat Amst...|Mary| 1| [NL, US]|
|Kalverstraat Amst...| Lex| 2|[NL, NL, US]|
| xvcv| ddd| 3| []|
+--------------------+----+---+------------+
df = df_addresses.crossJoin(df_regex)
to join the two dataframes, thendf.filter(df('address').rlike(df('keywords'))
and then group on the ID column inherited fromdf_addresses
. This will run entirely in Spark without marshalling data to and from the Python helper processes. – Hristo Ilievdf_addresses.join(df_regex, df_addresses('address').rlike(df_regex('keywords')), 'cross')
. – Hristo Iliev