Another way for handling column mapping in PySpark is via dictionary
. Dictionaries help you to map the columns of the initial dataframe into the columns of the final dataframe using the the key/value
structure as shown below:
from pyspark.sql.functions import col
df = spark.createDataFrame([
[1, "John", "2019-12-01 10:00:00"],
[2, "Michael", "2019-12-01 11:00:00"],
[2, "Michael", "2019-12-01 11:01:00"],
[3, "Tom", "2019-11-13 20:00:00"],
[3, "Tom", "2019-11-14 00:00:00"],
[4, "Sofy", "2019-10-01 01:00:00"]
], ["A", "B", "C"])
col_map = {"A":"Z", "B":"X", "C":"Y"}
df.select(*[col(k).alias(col_map[k]) for k in col_map]).show()
# +---+-------+-------------------+
# | Z| X| Y|
# +---+-------+-------------------+
# | 1| John|2019-12-01 10:00:00|
# | 2|Michael|2019-12-01 11:00:00|
# | 2|Michael|2019-12-01 11:01:00|
# | 3| Tom|2019-11-13 20:00:00|
# | 3| Tom|2019-11-14 00:00:00|
# | 4| Sofy|2019-10-01 01:00:00|
# +---+-------+-------------------+
Here we map A, B, C into Z, X, Y respectively.
And if you want a modular solution you also put everything inside a function:
def transform_cols(mappings, df):
return df.select(*[col(k).alias(mappings[k]) for k in mappings])
Or even more modular by using monkey patching to extend the existing functionality of the DataFrame
class. Place the next code on top of your PySpark code (you can also create a mini library and include it on your code when needed):
from pyspark.sql import DataFrame
def transform_cols(self, mappings):
return self.select(*[col(k).alias(mappings[k]) for k in mappings])
DataFrame.transform = transform_cols
Then call it with:
df.transform(col_map).show()
PS: This could be a convenient way to extend the DataFrame functionality by creating your own libraries and expose them via the DataFrame and monkey patching (extension method for those familiar with C#).