1
votes

Problem: I am trying to combine Sparse Vectors into one per id (it should be an aggregation result after grouping rows by id).

The origin DataFrame I am operating with (and which I applied transform methods on) looks like this:

Input:

+---+-------+--------+--------+
| id|   col1|    col2|    col3|
+---+-------+--------+--------+
|  1|  [Red]|  [John]|  [Male]|
|  1| [Blue]| [Alice]|[Female]|
|  1|[Green]|[Celine]|  [Male]|
|  2|  [Red]|   [Bob]|  [Male]|
|  1|  [Red]|  [John]|  [Male]|
|  2|[Green]| [Alice]|[Female]|
+---+-------+--------+--------+

What've done so far are two transformations:

In first step I used the CountVectorizer to get feature vectors per columns per row, output:

+---+-------------+-------------+-------------+
|id |vectors1     |vectors2     |vectors3     |
+---+-------------+-------------+-------------+
|1  |(3,[0],[1.0])|(4,[1],[1.0])|(2,[0],[1.0])|
|1  |(3,[2],[1.0])|(4,[0],[1.0])|(2,[1],[1.0])|
|1  |(3,[1],[1.0])|(4,[2],[1.0])|(2,[0],[1.0])|
|2  |(3,[0],[1.0])|(4,[3],[1.0])|(2,[0],[1.0])|
|1  |(3,[0],[1.0])|(4,[1],[1.0])|(2,[0],[1.0])|
|2  |(3,[1],[1.0])|(4,[0],[1.0])|(2,[1],[1.0])|
+---+-------------+-------------+-------------+

In second step (basing on previous output), I did use VectorAssembler to assembly all these columns into one column named features:

+---+-------------------------+
|id |features                 |
+---+-------------------------+
|1  |(9,[0,4,7],[1.0,1.0,1.0])|
|1  |(9,[2,3,8],[1.0,1.0,1.0])|
|1  |(9,[1,5,7],[1.0,1.0,1.0])|
|2  |(9,[0,6,7],[1.0,1.0,1.0])|
|1  |(9,[0,4,7],[1.0,1.0,1.0])|
|2  |(9,[1,3,8],[1.0,1.0,1.0])|
+---+-------------------------+

Expected Solution/Output/DataFrame: What I am trying to achieve is to figure out the groupby operation on id followed by some particular aggregation function that can transform the second output (or maybe somehow first output) into this:

+--+-------------------------------------------------------+
|id|features                                               |
+--+-------------------------------------------------------+
|1 |(9,[0,1,2,3,4,5,7,8],[1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0])|
|2 |            (9,[0,1,3,6,7,8],[1.0,1.0,1.0,1.0,1.0,1.0])|
+--+-------------------------------------------------------+

I might be wrong but I am looking most probably for something that can sum all the Sparse Vectors, and combine their internal arrays only...

*also what I am looking for but as an option is to have the possibility in that agg function of assuming the number of occurrences of particular features from the arrays, so alternatively the features columns can be converted to this:

+--+-------------------------------------------------------+
|id|features                                               |
+--+-------------------------------------------------------+
|1 |(9,[0,1,2,3,4,5,7,8],[2.0,1.0,1.0,1.0,2.0,1.0,3.0,1.0])| # 0: 2 times, 4: 2 times, 7: 3 times
|2 |            (9,[0,1,3,6,7,8],[1.0,1.0,1.0,1.0,1.0,1.0])|
+--+-------------------------------------------------------+
1
Plese read the description of the ml tag.molbdnilo

1 Answers

1
votes

I had a very similar problem, and just found a horrible solution using UDF.

Starting with the sample you provided:

from pyspark.ml.linalg import SparseVector, DenseVector
import pyspark.sql.functions as F
df = sc.parallelize([
    (1, SparseVector(9,[0,4,7],[1.0,1.0,1.0])),
    (1, SparseVector(9,[2,3,8],[1.0,1.0,1.0])),
    (1, SparseVector(9,[1,5,7],[1.0,1.0,1.0])),
    (2, SparseVector(9,[0,6,7],[1.0,1.0,1.0])),
    (1, SparseVector(9,[0,4,7],[1.0,1.0,1.0])),
    (2, SparseVector(9,[1,3,8],[1.0,1.0,1.0])),
]).toDF(["id", "features"])

I created this UDF to add vectors:

from pyspark.ml.linalg import Vectors, VectorUDT
import numpy as np

@F.udf(returnType=VectorUDT())
def elementwise_sum(vectors):
    res = None
    for vec in vectors:
        if res is None:
            res = vec
        else:
            res = np.add(vec,res)
    return SparseVector(len(res),{k: v for k,v in enumerate(res) if v != 0})

With this you will be able to agg the vectors and return a resulting vector

df = df.groupBy('id').agg(elementwise_sum(F.collect_list('features')).alias('features'))
df.show(10,False)

+---+-------------------------------------------------------+
|id |features_raw                                           |
+---+-------------------------------------------------------+
|1  |(9,[0,1,2,3,4,5,7,8],[2.0,1.0,1.0,1.0,2.0,1.0,3.0,1.0])|
|2  |(9,[0,1,3,6,7,8],[1.0,1.0,1.0,1.0,1.0,1.0])            |
+---+-------------------------------------------------------+