
I am trying to get the correspond topic words for the term ID which I get from LDA model.

Here is the data frame of topics and it's word distribution from LDA in Spark

|topic|         termIndices|         termWeights|
|    0|[0, 39, 68, 43, 5...|[0.06362107696025...|
Now since we have termIndices and not the actual words, I wanted to add another column to this data frame which would be the words for the corresponding termIndices.

Now since I ran the CountVectorizer in Spark, I use that model and get the list of words array like below.

# Creating Term Frequency Vector for each word
cv=CountVectorizer(inputCol="words", outputCol="tf_features", minDF=2.0)

cvModel.vocabulary gives the list of words.

So now here is a udf I wrote to get the mapping:

from pyspark.sql.functions import udf
from pyspark.sql.types import ArrayType

def term_to_words(termindices):
    """ To get the corresponding words from term indices


    return np.array(cvModel.vocabulary)[termindices]



The reason I converted the list to np array because in numpy array I can index by passing a lift of indices which one can't do that in a list.

But I get this error. Which I am not sure why is the case as I am hardly doing anything here.

Py4JError: An error occurred while calling o443.__getnewargs__. Trace:
py4j.Py4JException: Method __getnewargs__([]) does not exist
    at py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:318)
    at py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:326)
    at py4j.Gateway.invoke(Gateway.java:272)
    at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
    at py4j.commands.CallCommand.execute(CallCommand.java:79)
    at py4j.GatewayConnection.run(GatewayConnection.java:214)
    at java.lang.Thread.run(Thread.java:745)


So I thought of using mapper function instead of udf

def term_to_words(x):
    """ Mapper function to get the corresponding words for the term index



    return (row['topic'],row['termIndices'],row['termWeights'],word_list[row[termindices]])

/Users/spark2/python/pyspark/context.pyc in runJob(self, rdd, partitionFunc, partitions, allowLocal)
    931         # SparkContext#runJob.
    932         mappedRDD = rdd.mapPartitions(partitionFunc)
--> 933         port = self._jvm.PythonRDD.runJob(self._jsc.sc(), mappedRDD._jrdd, partitions)
    934         return list(_load_from_socket(port, mappedRDD._jrdd_deserializer))

AttributeError: 'NoneType' object has no attribute 'sc'

There are two different problems here:

  • CountVectorizer is a wrapper for Java object. It cannot be serialized and passed with the closure. For the same reason you cannot use it in map closure.
  • You cannot return NumPy types from UDF.

You can for example:

from pyspark.sql.types import ArrayType, StringType

def indices_to_terms(vocabulary):
    def indices_to_terms(xs):
        return [vocabulary[int(x)] for x in xs]
    return udf(indices_to_terms, ArrayType(StringType()))


    "topics_words", indices_to_terms(cvModel.vocabulary)("termIndices"))

If you want to use NumPy arrays you'll have use tolist() method before returning from the UDF.


If you can, you want to use StringIndexer and IndexToString. StringIndexer which will create the term indexes for you from a column of terms, and the IndexToString will look the string values up given an index (your *term_to_words* function). There are code examples in the linked Spark documents.