1
votes

I have a corpus of documents that I'm reading into a spark data frame. I have tokeniked and vectorized the text and now I want to feed the vectorized data into an mllib LDA model. The LDA API docs seems to require the data to be:

rdd – RDD of documents, which are tuples of document IDs and term (word) count vectors. The term count vectors are “bags of words” with a fixed-size vocabulary (where the vocabulary size is the length of the vector). Document IDs must be unique and >= 0.

How can I get from my data frame to a suitable rdd?

from pyspark.mllib.clustering import LDA
from pyspark.ml.feature import Tokenizer
from pyspark.ml.feature import CountVectorizer

#read the data
tf = sc.wholeTextFiles("20_newsgroups/*")

#transform into a data frame
df = tf.toDF(schema=['file','text'])

#tokenize
tokenizer = Tokenizer(inputCol="text", outputCol="words")
tokenized = tokenizer.transform(df)

#vectorize
cv = CountVectorizer(inputCol="words", outputCol="vectors")
model = cv.fit(tokenized)
result = model.transform(tokenized)

#transform into a suitable rdd
myrdd = ?

#LDA
model = LDA.train(myrdd, k=2, seed=1)

PS : I'm using Apache Spark 1.6.3

1
If I may ask, why are you using MLlib's LDA ? LDA is available with spark-mleliasah
Just trying to stich pieces across several tutorials. Not opposed to take a different approach.ADJ
Then I advice to look at the official documentation concerning spark-ml. It's straightforward. Your value result is ready to feed as is normally.eliasah
Just checked my sprk version. It's 1.6 and it doesn't seem to have LDA in pyspark.ml.clusteringADJ
Will need to try it after the holiday weekend. thanks.ADJ

1 Answers

4
votes

Let's first organize imports, read the data, do some simple special characters removal and transform it into a DataFrame:

import re # needed to remove special character
from pyspark import Row

from pyspark.ml.feature import StopWordsRemover
from pyspark.ml.feature import Tokenizer, CountVectorizer
from pyspark.mllib.clustering import LDA
from pyspark.sql import functions as F
from pyspark.sql.types import StructType, StructField, LongType

pattern = re.compile('[\W_]+') 

rdd = sc.wholeTextFiles("./data/20news-bydate/*/*/*") \
    .mapValues(lambda x: pattern.sub(' ', x)).cache() # ref. https://stackoverflow.com/a/1277047/3415409

df = rdd.toDF(schema=['file', 'text'])

We will need to add an index to each Row. The following code snippet is inspired from this question about adding primary keys with Apache Spark :

row_with_index = Row(*["id"] + df.columns)

def make_row(columns):
    def _make_row(row, uid):
        row_dict = row.asDict()
        return row_with_index(*[uid] + [row_dict.get(c) for c in columns])

    return _make_row

f = make_row(df.columns)

indexed = (df.rdd
           .zipWithUniqueId()
           .map(lambda x: f(*x))
           .toDF(StructType([StructField("id", LongType(), False)] + df.schema.fields)))

Once we have added the index, we can proceed to the features cleansing, extraction and transformation :

# tokenize
tokenizer = Tokenizer(inputCol="text", outputCol="tokens")
tokenized = tokenizer.transform(indexed)

# remove stop words
remover = StopWordsRemover(inputCol="tokens", outputCol="words")
cleaned = remover.transform(tokenized)

# vectorize
cv = CountVectorizer(inputCol="words", outputCol="vectors")
count_vectorizer_model = cv.fit(cleaned)
result = count_vectorizer_model.transform(cleaned)

Now, let's transform the results dataframe back to rdd

corpus = result.select(F.col('id').cast("long"), 'vectors').rdd \
    .map(lambda x: [x[0], x[1]])

Our data is now ready to be trained :

# training data
lda_model = LDA.train(rdd=corpus, k=10, seed=12, maxIterations=50)
# extracting topics
topics = lda_model.describeTopics(maxTermsPerTopic=10)
# extraction vocabulary
vocabulary = count_vectorizer_model.vocabulary

We can print the topics descriptions now as followed :

for topic in range(len(topics)):
    print("topic {} : ".format(topic))
    words = topics[topic][0]
    scores = topics[topic][1]
    [print(vocabulary[words[word]], "->", scores[word]) for word in range(len(words))]

PS : This above code was tested with Spark 1.6.3.