Not a duplicate of this because I'm asking about what the input is, not what function to call, see below
I followed this guide to create an LDA model in Spark 1.5. I saw in this question that to get the topic distribution of a new document I need to use the LocalLDAModel's topicDistributions function which takes an RDD[(Long, Vector)].
Should the new document vector be a term-count vector? That is the type of vector the LDA is trained with. My code compiles and runs but I'd like to know if this is the intended use of the topicDistributions function
import org.apache.spark.rdd._
import org.apache.spark.mllib.clustering.{LDA, DistributedLDAModel, LocalLDAModel}
import org.apache.spark.mllib.linalg.{Vector, Vectors}
import scala.collection.mutable
val input = Seq("this is a document","this could be another document","these are training, not tests", "here is the final file (document)")
val corpus: RDD[Array[String]] = sc.parallelize(input.map{
doc => doc.split("\\s")
})
val termCounts: Array[(String, Long)] = corpus.flatMap(_.map(_ -> 1L)).reduceByKey(_ + _).collect().sortBy(-_._2)
val vocabArray: Array[String] = termCounts.takeRight(termCounts.size).map(_._1)
val vocab: Map[String, Int] = vocabArray.zipWithIndex.toMap
// Convert documents into term count vectors
val documents: RDD[(Long, Vector)] =
corpus.zipWithIndex.map { case (tokens, id) =>
val counts = new mutable.HashMap[Int, Double]()
tokens.foreach { term =>
if (vocab.contains(term)) {
val idx = vocab(term)
counts(idx) = counts.getOrElse(idx, 0.0) + 1.0
}
}
(id, Vectors.sparse(vocab.size, counts.toSeq))
}
// Set LDA parameters
val numTopics = 10
val ldaModel: DistributedLDAModel = new LDA().setK(numTopics).setMaxIterations(20).run(documents).asInstanceOf[DistributedLDAModel]
//create test input, convert to term count, and get its topic distribution
val test_input = Seq("this is my test document")
val test_document:RDD[(Long,Vector)] = sc.parallelize(test_input.map(doc=>doc.split("\\s"))).zipWithIndex.map{ case (tokens, id) =>
val counts = new mutable.HashMap[Int, Double]()
tokens.foreach { term =>
if (vocab.contains(term)) {
val idx = vocab(term)
counts(idx) = counts.getOrElse(idx, 0.0) + 1.0
}
}
(id, Vectors.sparse(vocab.size, counts.toSeq))
}
println("test_document: "+test_document.first._2.toArray.mkString(", "))
val localLDAModel: LocalLDAModel = ldaModel.toLocal
val topicDistributions = localLDAModel.topicDistributions(documents)
println("first topic distribution:"+topicDistributions.first._2.toArray.mkString(", "))