13
votes

Does anyone have an example of data visualization of an LDA model trained using the PySpark library (specifically using pyLDAvis)? I've seen a lot of examples for GenSim and other libraries but not PySpark. Specifically I'm wondering what to pass into the pyLDAvis.prepare() function and how to get it from my lda model. Here is my code:

 from pyspark.mllib.clustering import LDA, LDAModel
 from pyspark.mllib.feature import IDF
 from pyspark.ml.feature import CountVectorizer
 from pyspark.mllib.linalg import Vectors

 vectorizer = CountVectorizer(inputCol="filtered1", outputCol="features").fit(filtered_final)
 countVectors = vectorizer.transform(filtered_final).select("status_id", "features")
 countVectors.show()
 frequencyVectors = countVectors.rdd.map(lambda vector: vector[1])
 frequencyDenseVectors = frequencyVectors.map(lambda vector: Vectors.dense(vector))
 idf = IDF().fit(frequencyDenseVectors)
 print('fitting complete')
 tfidf = idf.transform(frequencyDenseVectors)
 print("tf idf complete")
 #prepare corpus for LDA
 corpus = tfidf.map(lambda x: [1, x]).cache()
 #train LDA
 ldaModel = LDA.train(corpus, k = 15, maxIterations=100, optimizer="online", docConcentration=2.0, topicConcentration=3.0)
 print("lda model complete")
2
Agree, nice visualization but no decent example documentation. Have you tried taking a look at the R implementation?Jonathan

2 Answers

3
votes

I have somehow managed to fit the output of pyspark to pyLDAvis.
The following code needs a little cleaning but it works.

from pyspark.ml.feature import StopWordsRemover,Tokenizer, RegexTokenizer, CountVectorizer, IDF
from pyspark.sql.functions import udf, col, size, explode, regexp_replace, trim, lower, lit
from pyspark.sql.types import ArrayType, StringType, DoubleType, IntegerType, LongType
from pyspark.ml.clustering import LDA
import pyLDAvis


def format_data_to_pyldavis(df_filtered, count_vectorizer, transformed, lda_model):
    xxx = df_filtered.select((explode(df_filtered.words_filtered)).alias("words")).groupby("words").count()
    word_counts = {r['words']:r['count'] for r in xxx.collect()}
    word_counts = [word_counts[w] for w in count_vectorizer.vocabulary]


    data = {'topic_term_dists': np.array(lda_model.topicsMatrix().toArray()).T, 
            'doc_topic_dists': np.array([x.toArray() for x in transformed.select(["topicDistribution"]).toPandas()['topicDistribution']]),
            'doc_lengths': [r[0] for r in df_filtered.select(size(df_filtered.words_filtered)).collect()],
            'vocab': count_vectorizer.vocabulary,
            'term_frequency': word_counts}

    return data

def filter_bad_docs(data):
    bad = 0
    doc_topic_dists_filtrado = []
    doc_lengths_filtrado = []

    for x,y in zip(data['doc_topic_dists'], data['doc_lengths']):
        if np.sum(x)==0:
            bad+=1
        elif np.sum(x) != 1:
            bad+=1
        elif np.isnan(x).any():
            bad+=1
        else:
            doc_topic_dists_filtrado.append(x)
            doc_lengths_filtrado.append(y)

    data['doc_topic_dists'] = doc_topic_dists_filtrado
    data['doc_lengths'] = doc_lengths_filtrado

# This is the only part that you have to implement:
create a Spark Dataframe named df_filtered and it has the list of raw words.
It can be the output of StopWordsRemover

# WORD COUNT
count_vectorizer = CountVectorizer(inputCol="words_filtered", outputCol="features", minDF=0.05, maxDF=0.5)
count_vectorizer = count_vectorizer.fit(df_filtered)
df_counted = count_vectorizer.transform(df_filtered)

# TF-IDF
idf = IDF(inputCol="features", outputCol="features_tfidf")
idf_model = idf.fit(df_counted)
df_tfidf = idf_model.transform(df_counted)

# LDA
lda = LDA(k=2, maxIter=20, featuresCol='features_tfidf')
lda_model = lda.fit(df_tfidf)
transformed = lda_model.transform(df_tfidf)

# FORMAT DATA AND PASS IT TO PYLDAVIS
data = format_data_to_pyldavis(df_filtered, count_vectorizer, transformed, lda_model)
filter_bad_docs(data) # this is, because for some reason some docs apears with 0 value in all the vectors, or the norm is not 1, so I filter those docs.
py_lda_prepared_data = pyLDAvis.prepare(**data)
pyLDAvis.display(py_lda_prepared_data)
0
votes

I haven't used pyLDAvis for the visualization of pyspark's LDA but here is an example how to use the prepare for sklearn without the special sklearn.prepare.

Here a link to the source code for pyLDAvis.prepare: https://github.com/bmabey/pyLDAvis/blob/master/pyLDAvis/_prepare.py

def prepare(topic_term_dists, doc_topic_dists, doc_lengths, vocab, term_frequency):
   """Transforms the topic model distributions and related corpus data into
   the data structures needed for the visualization.
    Parameters
    ----------
    topic_term_dists : array-like, shape (n_topics, n_terms)
        Matrix of topic-term probabilities. Where n_terms is len(vocab).
    doc_topic_dists : array-like, shape (n_docs, n_topics)
        Matrix of document-topic probabilities.
    doc_lengths : array-like, shape n_docs
        The length of each document, i.e. the number of words in each document.
        The order of the numbers should be consistent with the ordering of the
        docs in doc_topic_dists.
    vocab : array-like, shape n_terms
        List of all the words in the corpus used to train the model.
    term_frequency : array-like, shape n_terms
        The count of each particular term over the entire corpus. The ordering
        of these counts should correspond with `vocab` and topic_term_dists.

Example for sklearn.decomposition.LatentDirichletAllocation:

tfidf_vectorizer = TfidfVectorizer(max_df=0.95)
tfidf = tfidf_vectorizer.fit_transform(data)
lda = LatentDirichletAllocation(n_components=10)
lda.fit(tfidf)
topic_term_dists = lda.components_ / lda.components_.sum(axis=1)[:, None]
doc_lengths = tfidf.sum(axis=1).getA1()
term_frequency = tfidf.sum(axis=0).getA1()
lda_doc_topic_dists = lda.transform(tfidf)
doc_topic_dists = lda_doc_topic_dists / lda_doc_topic_dists.sum(axis=1)[:, None]
vocab = tfidf_vectorizer.get_feature_names()
lda_pyldavis = pyLDAvis.prepare(topic_term_dists, doc_topic_dists, doc_lengths, vocab, term_frequency)
pyLDAvis.display(lda_pyldavis)