0
votes

I am trying to profile some data using pyspark dataframes that contains strings, timestamps, integers and floats.

toy df:

sdf1 = 
|id1|id2|id3|
+---+---+---+
| 1 |"a"| 4 |
+---+---+---+
| 2 |"a"| 6 |
+---+---+---+
| 1 |"a"| 7 |
+---+---+---+
| 3 |"a"| 9 |
+---+---+---+


sdf2 = 
|ids|
+---+
|id1|
+---+
|id2|
+---+
|id3|
+---+

I am trying to achieve the following

agg_instructions = [F.max(x).alias("{0}".format(x)) for x in sdf1.columns]

sdf3 = sdf2.withColumn("max", sdf1.agg(*agg_instructions))

Which would result in the following dataframe. However this does not work - any work around?

sdf3 = 
|ids|max|
+---+---+
|id1| 3 |
+---+---+
|id2|"a"|
+---+---+
|id3| 9 |
+---+---+

I get the following error:

AssertionError Traceback (most recent call last) in () 7 agg_instructions = [F.max(x).alias("{0}".format(x)) for x in data_sdf.columns] 8 ----> 9 sdf3 = sdf2.withColumn("max", sdf1.agg(*agg_instructions)) 10 11 test = test.reset_index()

/databricks/spark/python/pyspark/sql/dataframe.py in withColumn(self, colName, col) 2011 2012 """ -> 2013 assert isinstance(col, Column), "col should be Column" 2014 return DataFrame(self._jdf.withColumn(colName, col._jc), self.sql_ctx) 2015

AssertionError: col should be Column

1

1 Answers

2
votes

This is too much for what you want to achieve. You can get the desired output from just sdf1.

One way is to create an array column that holds structs of column name and their corresponding max. Then explode it and select the struct fields.

Here is an example:

data = [(1, "a", 4), (2, "a", 6), (1, "a", 7), (3, "a", 9)]
df = spark.createDataFrame(data, ["id1", "id2", "id3"])

agg_instructions = array(
        *[struct(lit(c).alias("ids"), max(col(c)).cast("string").alias("max")) for c in df.columns]
    )

df.agg(agg_instructions.alias("agg")) \
  .withColumn("agg", explode(col("agg"))) \
  .select("agg.*") \
  .show()

#+---+---+
#|ids|max|
#+---+---+
#|id1|3  |
#|id2|a  |
#|id3|9  |
#+---+---+