2
votes

i'm working with a pyspark dataframe which is:

+----+----+---+---+---+----+
|   a|   b|  c|  d|  e|   f|
+----+----+---+---+---+----+
|   2|12.3|  5|5.6|  6|44.7|
|null|null|  9|9.3| 19|23.5|
|   8| 4.3|  7|0.5| 21| 8.2|
|   9| 3.8|  3|6.5| 45| 4.9|
|   3| 8.7|  2|2.8| 32| 2.9|
+----+----+---+---+---+----+

To create the above dataframe:

rdd =  sc.parallelize([(2,12.3,5,5.6,6,44.7), 
                (None,None,9,9.3,19,23.5), 
                (8,4.3,7,0.5,21,8.2),
                 (9,3.8,3,6.5,45,4.9),
                  (3,8.7,2,2.8,32,2.9)])
df = sqlContext.createDataFrame(rdd, ('a', 'b','c','d','e','f'))
df.show()

I want to create another column 'g' whose values are list of tuples based on existing non null columns. The list of tuples are of form :

((column a, column b),(column c, column d),(column e, column f))

Requirements for output col: 1) Only consider the non null columns while creating the list of tuples. 2) Return the list of tuples.

So the final dataframe with column 'g' would be:

+---+----+---+---+---+----+--------------------------+
|  a|   b|  c|  d|  e|   f|                   g      |
+---+----+---+---+---+----+--------------------------+
|  2|12.3|  5|5.6|  6|44.7|[[2,12.3],[5,5.6],[6,44.7]|
|nul|nul|  9 |9.3| 19|23.5|[[9,9.3],[19,23.5]        |
|  8| 4.3|  7|0.5| 21| 8.2|[[8,4.3],[7,0.5],[21,8.2] |
|  9| 3.8|  3|6.5| 45| 4.9|[[9,3.8],[3,6.5],[45,4.9] |
|  3| 8.7|  2|2.8| 32| 2.9|[[3,8.7],[2,2.8],[32,2.9] |
+---+----+---+---+---+----+--------------------------+

In column "g", the second row tuple has only two pairs as opposed to three, because for second row, we omit column 'a' and 'b' values since they are nulls.

I'm not sure how to dynamically omit the null values in columns and form the tuple list

I tried to partially achieve the final column by a udf:

l1=['a','c','e']
l2=['b','d','f']
def func1(r1,r2):
    l=[]
    for i in range(len(l1)):
        l.append((r1[i],r2[i]))
    return l
func1_udf=udf(func1)
df=df.withColumn('g',func1_udf(array(l1),array(l2)))
df.show()

I tried declaring the udf as ArrayType, it did not work. Any help would be much appreciated. I'm working with pyspark 1.6. Thank you!

3
you did not find any of the answers useful?mayank agrawal

3 Answers

0
votes

I think UDFs should work just fine.

import pyspark.sql.functions as F
from pyspark.sql.types import *

rdd =  sc.parallelize([(2,12.3,5,5.6,6,44.7), 
            (None,None,9,9.3,19,23.5), 
            (8,4.3,7,0.5,21,8.2),
             (9,3.8,3,6.5,45,4.9),
              (3,8.7,2,2.8,32,2.9)])
df = sql.createDataFrame(rdd, ('a', 'b','c','d','e','f'))
df = df.select(*(F.col(c).cast("float").alias(c) for c in df.columns))

def combine(a,b,c,d,e,f):

    combine_ = []
    if None not in [a,b]:
        combine_.append([a,b])
    if None not in [c,d]:
        combine_.append([c,d])
    if None not in [e,f]:
        combine_.append([e,f])
    return combine_

combine_udf = F.udf(combine,ArrayType(ArrayType(FloatType())))
df = df.withColumn('combined', combine_udf(F.col('a'),F.col('b'),F.col('c'),\
               F.col('d'),F.col('e'),F.col('f')))
df.show()
0
votes

You can try something like this:

df.withColumn("g", when(col("a").isNotNull() & col("b").isNotNull(), 
array(col("a"),col("b"))).otherwise(array(lit("")))).withColumn("h", 
when(col("c").isNotNull() & col("d").isNotNull(), 
array(col("c"),col("d"))).otherwise(array(lit ("")))).withColumn("i", 
when(col("e").isNotNull() & col("f").isNotNull(), 
array(col("e"),col("f"))).otherwise(array(lit("")))).withColumn("concat", 
array(col("g"),col("h"),col("i"))).drop('g','h','i').show(truncate=False)

Resulting df:

+----+----+---+---+---+----+------------------------------------------------
--------------------------+
|a   |b   |c  |d  |e  |f   |concat                                                                    
|
+----+----+---+---+---+----+------------------------------------------------
--------------------------+
|2   |12.3|5  |5.6|6  |44.7|[WrappedArray(2.0, 12.3), WrappedArray(5.0, 
5.6), WrappedArray(6.0, 44.7)]|
|null|null|9  |9.3|19 |23.5|[WrappedArray(), WrappedArray(9.0, 9.3), 
WrappedArray(19.0, 23.5)]        |
|8   |4.3 |7  |0.5|21 |8.2 |[WrappedArray(8.0, 4.3), WrappedArray(7.0, 0.5), 
WrappedArray(21.0, 8.2)] |
|9   |3.8 |3  |6.5|45 |4.9 |[WrappedArray(9.0, 3.8), WrappedArray(3.0, 6.5), 
WrappedArray(45.0, 4.9)] |
|3   |8.7 |2  |2.8|32 |2.9 |[WrappedArray(3.0, 8.7), WrappedArray(2.0, 2.8), 
WrappedArray(32.0, 2.9)] |
+----+----+---+---+---+----+------------------------------------------------
--------------------------+
0
votes

Another solution using udf,

>>> from pyspark.sql import functions as F
>>> from pyspark.sql.types import *

>>> arr_udf = F.udf(lambda row : [x for x in [row[0:2],row[2:4],row[4:6]] if all(x)],ArrayType(ArrayType(StringType())))
>>> df.select("*",arr_udf(F.struct([df[x] for x in df.columns])).alias('g')).show(truncate=False)
+----+----+---+---+---+----+--------------------------------------------------------------------+
|a   |b   |c  |d  |e  |f   |g                                                                   |
+----+----+---+---+---+----+--------------------------------------------------------------------+
|2   |12.3|5  |5.6|6  |44.7|[WrappedArray(2, 12.3), WrappedArray(5, 5.6), WrappedArray(6, 44.7)]|
|null|null|9  |9.3|19 |23.5|[WrappedArray(9, 9.3), WrappedArray(19, 23.5)]                      |
|8   |4.3 |7  |0.5|21 |8.2 |[WrappedArray(8, 4.3), WrappedArray(7, 0.5), WrappedArray(21, 8.2)] |
|9   |3.8 |3  |6.5|45 |4.9 |[WrappedArray(9, 3.8), WrappedArray(3, 6.5), WrappedArray(45, 4.9)] |
|3   |8.7 |2  |2.8|32 |2.9 |[WrappedArray(3, 8.7), WrappedArray(2, 2.8), WrappedArray(32, 2.9)] |
+----+----+---+---+---+----+--------------------------------------------------------------------+