1
votes

I have a Dataframe with two columns: BrandWatchErwaehnungID and word_counts. The word_counts column is the output of `CountVectorizer (a sparse vector). After dropped the empty rows I have created two new columns one with the indices of the sparse vector and one with their values.

help0 = countedwords_text['BrandWatchErwaehnungID','word_counts'].rdd\
    .filter(lambda x : x[1].indices.size!=0)\
    .map(lambda x : (x[0],x[1],DenseVector(x[1].indices) , DenseVector(x[1].values))).toDF()\
    .withColumnRenamed("_1", "BrandWatchErwaenungID").withColumnRenamed("_2", "word_counts")\
    .withColumnRenamed("_3", "word_indices").withColumnRenamed("_4", "single_word_counts")

I needed to convert them to dense vectors before adding to my Dataframe due to spark did not accept numpy.ndarray. My problem is that I now want to explode that Dataframeon the word_indices column but the explode method from pyspark.sql.functions does only support arrays or map as input.

I have tried:

help1 = help0.withColumn('b' , explode(help0.word_indices))

and get the following error:

cannot resolve 'explode(`word_indices')' due to data type mismatch: input to function explode should be array or map type

Afterwards I tried:

help1 = help0.withColumn('b' , explode(help0.word_indices.toArray()))

Which also did not worked... Any suggestions?

1

1 Answers

2
votes

You have to use udf:

from pyspark.sql.functions import udf, explode
from pyspark.sql.types import *
from pyspark.ml.linalg import *

@udf("array<integer>")
def indices(v):
   if isinstance(v, DenseVector):
      return list(range(len(v)))
   if isinstance(v, SparseVector):
      return v.indices.tolist()

df = spark.createDataFrame([
   (1, DenseVector([1, 2, 3])), (2, SparseVector(5, {4: 42}))], 
   ("id", "v"))

df.select("id", explode(indices("v"))).show()

# +---+---+
# | id|col|
# +---+---+
# |  1|  0|
# |  1|  1|
# |  1|  2|
# |  2|  4|
# +---+---+