4
votes

I have data with below schema. I want all the columns should be in sorted alphabetically. I want it in pyspark data frame.

root
 |-- _id: string (nullable = true)
 |-- first_name: string (nullable = true)
 |-- last_name: string (nullable = true)
 |-- address: struct (nullable = true)
 |    |-- pin: integer (nullable = true)
 |    |-- city: string (nullable = true)
 |    |-- street: string (nullable = true)

The below code sorts only the outer columns but not the nested columns.

>>> cols = df.columns
>>> df2=df[sorted(cols)]
>>> df2.printSchema()

The schema after this code looks like

root
 |-- _id: string (nullable = true)
 |-- address: struct (nullable = true)
 |    |-- pin: integer (nullable = true)
 |    |-- city: string (nullable = true)
 |    |-- street: string (nullable = true)
 |-- first_name: string (nullable = true)
 |-- last_name: string (nullable = true)

(since there's underscore at id, it appears first)

The schema which I want is as below. (Even the columns inside the address should be sorted)

root
 |-- _id: string (nullable = true)
 |-- address: struct (nullable = true)
 |    |-- city: string (nullable = true)
 |    |-- pin: integer (nullable = true)
 |    |-- street: string (nullable = true)
 |-- first_name: string (nullable = true)
 |-- last_name: string (nullable = true)

Thanks in advance.

2
Can you give a sample data to check with?samkart
@samkart we need only column names here. we're not gonna do anything with the data right, so I don't think it needs sample data.Tula

2 Answers

3
votes

Here is a solution that should work for arbitrarily deeply nested StructTypes which doesn't rely on hard coding any column names.

To demonstrate, I've created the following slightly more complex schema, where there is a second level of nesting within the address column. Let's suppose that your DataFrame schema were the following:

df.printSchema()
#root
# |-- _id: string (nullable = true)
# |-- first_name: string (nullable = true)
# |-- last_name: string (nullable = true)
# |-- address: struct (nullable = true)
# |    |-- pin: integer (nullable = true)
# |    |-- city: string (nullable = true)
# |    |-- zip: struct (nullable = true)
# |    |    |-- last4: integer (nullable = true)
# |    |    |-- first5: integer (nullable = true)
# |    |-- street: string (nullable = true)

Notice the address.zip field which contains 2 out of order sub fields.

You can define a function that will recursively step through your schema and sort the fields to build a Spark-SQL select expression:

from pyspark.sql.types import StructType, StructField

def schemaToSelectExpr(schema, baseField=""):
    select_cols = []
    for structField in sorted(schema, key=lambda x: x.name):
        if structField.dataType.typeName() == 'struct':

            subFields = []
            for fld in sorted(structField.jsonValue()['type']['fields'], 
                              key=lambda x: x['name']):
                newStruct = StructType([StructField.fromJson(fld)])
                newBaseField = structField.name
                if baseField:
                    newBaseField = baseField + "." + newBaseField
                subFields.extend(schemaToSelectExpr(newStruct, baseField=newBaseField))

            select_cols.append(
                "struct(" + ",".join(subFields) + ") AS {}".format(structField.name)
            )
        else:
            if baseField:
                select_cols.append(baseField + "." + structField.name)
            else:
                select_cols.append(structField.name)
    return select_cols

Running this on this DataFrame's schema yields (I've broken the long 'address' string into two lines for readability):

print(schemaToSelectExpr(df.schema))
#['_id',
#'struct(address.city,address.pin,address.street,
#        struct(address.zip.first5,address.zip.last4) AS zip) AS address',
# 'first_name',
# 'last_name']

Now use selectExpr to sort the columns:

df = df.selectExpr(schemaToSelectExpr(df.schema))
df.printSchema()
#root
# |-- _id: string (nullable = true)
# |-- address: struct (nullable = false)
# |    |-- city: string (nullable = true)
# |    |-- pin: integer (nullable = true)
# |    |-- street: string (nullable = true)
# |    |-- zip: struct (nullable = false)
# |    |    |-- first5: integer (nullable = true)
# |    |    |-- last4: integer (nullable = true)
# |-- first_name: string (nullable = true)
# |-- last_name: string (nullable = true)
-1
votes

You can first flatten your DF with the nice colname.* synthax for struct types. For nested flattening you can use this: How to flatten a struct in a Spark dataframe?.

On the flattened dataframe you can then create a new StructCol with input cols sorted:

from pyspark.sql import functions as F
address_cols = df.select(F.col("address.*")).columns
df = df.withColumn(address, F.struct(*sorted([F.col(c) for c in address_cols])))
df2 = df[sorted(df.columns)]