I'm working on implementing a Spark LDA model (via the Scala API), and am having trouble with the necessary formatting steps for my data. My raw data (stored in a text file) is in the following format, essentially a list of tokens and the documents they correspond to. A simplified example:
doc XXXXX term XXXXX
1 x 'a' x
1 x 'a' x
1 x 'b' x
2 x 'b' x
2 x 'd' x
...
Where the XXXXX columns are garbage data I don't care about. I realize this is an atypical way of storing corpus data, but it's what I have. As is I hope is clear from the example, there's one line per token in the raw data (so if a given term appears 5 times in a document, that corresponds to 5 lines of text).
In any case, I need to format this data as sparse term-frequency vectors for running a Spark LDA model, but am unfamiliar with Scala so having some trouble.
I start with:
import org.apache.spark.mllib.clustering.{LDA, DistributedLDAModel}
import org.apache.spark.mllib.linalg.{Vector, Vectors}
import org.apache.spark.rdd.RDD
val corpus:RDD[Array[String]] = sc.textFile("path/to/data")
.map(_.split('\t')).map(x => Array(x(0),x(2)))
And then I get the vocabulary data I'll need to generate the sparse vectors:
val vocab: RDD[String] = corpus.map(_(1)).distinct()
val vocabMap: Map[String, Int] = vocab.collect().zipWithIndex.toMap
What I don't know is the proper mapping function to use here such that I end up with a sparse term frequency vector for each document that I can then feed into the LDA model. I think I need something along these lines...
val documents: RDD[(Long, Vector)] = corpus.groupBy(_(0)).zipWithIndex
.map(x =>(x._2,Vectors.sparse(vocabMap.size, ???)))
At which point I can run the actual LDA:
val lda = new LDA().setK(n_topics)
val ldaModel = lda.run(documents)
Basically, I don't what function to apply to each group so that I can feed term frequency data (presumably as a map
?) into a sparse vector. In other words, how do I fill in the ???
in the code snippet above to achieve the desired effect?