5
votes

I have an issue where I need to dynamically update columns in a Spark dataframe.

Basically I need to loop through the column list and if the column exists already in the list, rename it to that column plus its index.

My attempted code was something like this:

def dup_cols(df):
  for i, icol in enumerate(df.columns):
    for x, xcol in enumerate(df.columns):
      if icol == xcol and i != x:
        df = df.withColumnsRenamed(xcol, xcol + '_' + str(x))
  return df

But this renames by name (here as xcol), thus not solving my issue.

Can I change this to rename the column in the dataframe by its index? I have searched around for quite a while and found nothing.

I also cannot convert to a Pandas dataframe, so I would need a Spark/PySpark solution to renaming a specific column by its index only.

Thank you!

2

2 Answers

7
votes

You can use pyspark.sql.DataFrame.toDF() to rename the columns:

Returns a new class:DataFrame that with new specified column names

Here is an example:

data = [
    (1, 2, 3),
    (4, 5, 6),
    (7, 8, 9)
]

df = spark.createDataFrame(data, ["a", "b", "a"])
df.printSchema()
#root
# |-- a: long (nullable = true)
# |-- b: long (nullable = true)
# |-- a: long (nullable = true)

Create new names based on your index logic:

new_names = []
counter = {c: -1 for c in df.columns}
for c in df.columns:
    new_c = c
    counter[c] += 1
    new_c += str(counter[c]) if counter[c] else ""
    new_names.append(new_c)
print(new_names)
#['a', 'b', 'a1']

Now use toDF() to create a new DataFrame with the new column names:

df = df.toDF(*new_names)
df.printSchema()
#root
# |-- a: long (nullable = true)
# |-- b: long (nullable = true)
# |-- a1: long (nullable = true)
-1
votes

Suppose dt is current dataframe

new_columns = []

for i in range(1, len(dt.columns)):

   new_columns.append("new_column_name)

for c, n in zip(dt.columns[1:], new_columns):

    dt = dt.withColumnRenamed(c, n)