4
votes

I have what seems like a simple problem but I keep banging my head against the wall with no success. I am essentially trying to do the same thing as this post except that I don't care about the "group by" aspect of that post, I just want to sum over all rows.

To paraphrase the linked post, the DataFrame looks like:

ID,Vec
1,[0,0,5]
2,[3,3,4]
3,[0,8,1]
....

I would like to element-wise sum the vectors.

The desired output of the above example would be a single row:

SumOfVectors
[3,11,10]

The other big difference is that I'm using pyspark, not Scala. I tried getting rdd.fold() to work, but either it doesn't work the same or I can't figure out the syntax in pyspark.

One final caveat is that I'm doing this on a dataframe of ~1MM rows and a vector of length ~10k so this has to be fairly efficient.

Thanks for any help! A reproducible toy dataframe is below, per comments.

import numpy as np
from pyspark.ml.linalg import Vectors

n_rows = 100

pdf = np.concatenate([np.array(range(n_rows)), np.random.randn(n_rows), 3*np.random.randn(n_rows)+2, 6*np.random.randn(n_rows)-2]).reshape(n_rows,-1)
dff = map(lambda x: (int(x[0]), Vectors.dense(x[1:])), pdf)

df = spark.createDataFrame(dff,schema=["ID", "Vec"])

df.schema should look like StructType(List(StructField(ID,LongType,true),StructField(Vec,VectorUDT,true)))

just printing df gives me DataFrame[ID: bigint, Vec: vector]

Also of possible importance, I'm on Spark 2.4

$ spark-submit --version
Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /___/ .__/\_,_/_/ /_/\_\   version 2.4.0
      /_/

Using Scala version 2.11.12, OpenJDK 64-Bit Server VM, 1.8.0_191
Branch HEAD
Compiled by user ec2-user on 2018-12-07T19:51:27Z
Revision bab859f34a291cb7b3f4e724b59e1b48af69016b
Url [email protected]:/pkg/Aws157BigTop
Type --help for more information.
2
You can use PandasUDF for user defined aggregate functions. Take a look at this post.pault

2 Answers

5
votes

I think you have to cast the vector column to an array before you can aggregate it.

from pyspark.ml.linalg import Vectors, VectorUDT
from pyspark.sql import functions as F
from pyspark.sql import types as T

def vec2array(v):
  v = Vectors.dense(v)
  array = list([float(x) for x in v])
  return array

vec2array_udf = F.udf(vec2array, T.ArrayType(T.FloatType()))

df = df.withColumn('Vec', vec2array_udf('Vec'))

n = len(df.select('Vec').first()[0])
bla = df.agg(F.array(*[F.sum(F.col("Vec")[i]) for i in range(n)]).alias("sum"))
bla.show(truncate=False)
1
votes

I eventually figured this out (I'm lying, one of my coworkers figured it out for me) so I'll post the answer here in case anyone has the same issue.

You can use fold similar to how it's done in the scala example linked in the original question. Syntax in pyspark is like so:

# find out how many Xs we're iterating over to establish the range below
vec_df = df.select('Vec')
num_cols = len(vec_df.first().Vec)

# iterate over vector to sum each "column"    
vec_sums = vec_df.rdd.fold([0]*num_cols, lambda a,b: [x + y for x, y in zip(a, b)])

Brief explanation: rdd.fold() takes two arguments. The first is an initialization array, in this case [0]*num_cols which is just an array of 0's. The second is a function to apply to the array and to use for iterating over each row of the dataframe. So for each row it does lambda a,b: [x + y for x, y in zip(a, b)] which just adds this row element-wise to what it has computed so far.

You can use my code in the original question to generate a toy dataframe to test this on. Hope that's helpful to someone.