2
votes

Not a duplicate of this because I'm asking about what the input is, not what function to call, see below

I followed this guide to create an LDA model in Spark 1.5. I saw in this question that to get the topic distribution of a new document I need to use the LocalLDAModel's topicDistributions function which takes an RDD[(Long, Vector)].

Should the new document vector be a term-count vector? That is the type of vector the LDA is trained with. My code compiles and runs but I'd like to know if this is the intended use of the topicDistributions function

import org.apache.spark.rdd._
import org.apache.spark.mllib.clustering.{LDA, DistributedLDAModel, LocalLDAModel}
import org.apache.spark.mllib.linalg.{Vector, Vectors}
import scala.collection.mutable

val input = Seq("this is a document","this could be another document","these are training, not tests", "here is the final file (document)")
val corpus: RDD[Array[String]] = sc.parallelize(input.map{ 
  doc => doc.split("\\s")
})

val termCounts: Array[(String, Long)] = corpus.flatMap(_.map(_ -> 1L)).reduceByKey(_ + _).collect().sortBy(-_._2)

val vocabArray: Array[String] = termCounts.takeRight(termCounts.size).map(_._1)
val vocab: Map[String, Int] = vocabArray.zipWithIndex.toMap

// Convert documents into term count vectors
val documents: RDD[(Long, Vector)] =
    corpus.zipWithIndex.map { case (tokens, id) =>
        val counts = new mutable.HashMap[Int, Double]()
        tokens.foreach { term =>
            if (vocab.contains(term)) {
                val idx = vocab(term)
                counts(idx) = counts.getOrElse(idx, 0.0) + 1.0
            }
        }
        (id, Vectors.sparse(vocab.size, counts.toSeq))
    }
// Set LDA parameters
val numTopics = 10
val ldaModel: DistributedLDAModel = new LDA().setK(numTopics).setMaxIterations(20).run(documents).asInstanceOf[DistributedLDAModel]

//create test input, convert to term count, and get its topic distribution
val test_input = Seq("this is my test document")
val test_document:RDD[(Long,Vector)] = sc.parallelize(test_input.map(doc=>doc.split("\\s"))).zipWithIndex.map{ case (tokens, id) =>
    val counts = new mutable.HashMap[Int, Double]()
    tokens.foreach { term =>
    if (vocab.contains(term)) {
        val idx = vocab(term)
        counts(idx) = counts.getOrElse(idx, 0.0) + 1.0
        }
    }
    (id, Vectors.sparse(vocab.size, counts.toSeq))
}
println("test_document: "+test_document.first._2.toArray.mkString(", "))

val localLDAModel: LocalLDAModel = ldaModel.toLocal
val topicDistributions = localLDAModel.topicDistributions(documents)
println("first topic distribution:"+topicDistributions.first._2.toArray.mkString(", "))
1

1 Answers

2
votes

According to the Spark src, I notice the following comment regarding the documents parameter:

   * @param documents:  
   * RDD of documents, which are term (word) count vectors paired with IDs.
   * The term count vectors are "bags of words" with a fixed-size vocabulary
   * (where the vocabulary size is the length of the vector).
   * This must use the same vocabulary (ordering of term counts) as in training.
   * Document IDs must be unique and >= 0.

So the answer is yes, the new document vector should be a term count vector. Further, the vector ordering should be the same as was used in training.