2
votes

So far I have been able to tokenize all of my documents, and use CountVectorizer and IDF from Spark's MLLib. I am trying to get the top 50 words from each document, but I am not sure how to sort the output of IDF.

onePer is a dataframe of document IDs and tokenized documents.

val tf = new CountVectorizer()
  .setInputCol("text")
  .setOutputCol("features").fit(onePer)
  .transform(onePer).select("features").rdd
    .map{x:Row => x.getAs[Vector](0)}

tf.cache()
val idf = new IDF().fit(tf)
val tfidf: RDD[Vector] = idf.transform(tf) 

This is what my output looks like (number of words in vocab, id of word, word score). I would like to sort by score and get the top k:

(440,[0,2,3,4,5,6,7,8,9,10,12,15,17,18,19,22,23,24,25,26,27,28,30,31,32,33,34,35,39,41,43,45,47,49,51,52,53,55,57,63,66,69,70,71,74,76,79,80,83,84,85,88,94,95,96,97,99,102,106,107,109,111,117,120,121,124,127,128,129,138,142,145,146,149,154,156,164,166,167,170,171,176,187,189,199,203,204,217,218,219,232,234,236,237,238,240,248,250,251,254,259,263,265,267,280,291,296,302,304,309,319,322,328,333,347,361,364,371,375,384,388,393,395,401,403,433,438,439],[1.3559553712291716,3.9422868018213513,0.6369074622370692,7.795697904781566,3.153829441457081,0.0,5.519201522549892,0.3184537311185346,0.3184537311185346,1.3559553712291716,0.4519851237430572,0.4519851237430572,0.6061358035703155,1.0116009116784799,0.4519851237430572,0.7884573603642703,0.4519851237430572,2.0232018233569597,0.7884573603642703,8.523740461192126,0.6061358035703155,0.6061358035703155,0.6061358035703155,0.6061358035703155,0.7884573603642703,0.6061358035703155,0.6061358035703155,0.6061358035703155,0.7884573603642703,0.7884573603642703,1.0116009116784799,1.0116009116784799,2.0232018233569597,0.7884573603642703,0.7884573603642703,3.897848952390783,0.7884573603642703,0.7884573603642703,1.0116009116784799,5.114244276715276,1.0116009116784799,1.0116009116784799,2.5985659682605218,1.2992829841302609,1.2992829841302609,1.0116009116784799,1.0116009116784799,1.0116009116784799,1.0116009116784799,1.0116009116784799,2.5985659682605218,1.0116009116784799,1.2992829841302609,1.2992829841302609,1.2992829841302609,1.2992829841302609,1.2992829841302609,1.2992829841302609,1.2992829841302609,1.2992829841302609,3.4094961844768505,1.2992829841302609,1.2992829841302609,1.2992829841302609,1.2992829841302609,3.4094961844768505,1.2992829841302609,1.2992829841302609,1.2992829841302609,3.4094961844768505,1.2992829841302609,1.2992829841302609,1.2992829841302609,1.2992829841302609,1.2992829841302609,1.2992829841302609,1.2992829841302609,1.2992829841302609,1.2992829841302609,1.2992829841302609,1.2992829841302609,1.7047480922384253,1.7047480922384253,1.7047480922384253,1.7047480922384253,1.7047480922384253,1.7047480922384253,1.7047480922384253,1.7047480922384253,1.7047480922384253,1.7047480922384253,1.7047480922384253,1.7047480922384253,1.7047480922384253,1.7047480922384253,1.7047480922384253,1.7047480922384253,1.7047480922384253,1.7047480922384253,1.7047480922384253,1.7047480922384253,1.7047480922384253,1.7047480922384253,1.7047480922384253,1.7047480922384253,1.7047480922384253,1.7047480922384253,1.7047480922384253,1.7047480922384253,1.7047480922384253,1.7047480922384253,1.7047480922384253,1.7047480922384253,1.7047480922384253,1.7047480922384253,1.7047480922384253,1.7047480922384253,1.7047480922384253,1.7047480922384253,1.7047480922384253,1.7047480922384253,1.7047480922384253,1.7047480922384253,1.7047480922384253,1.7047480922384253,1.7047480922384253,1.7047480922384253,1.7047480922384253])

Update

I was able to get this working by doing the following:

tfidf.map(x => x.toSparse).map{x => x.indices.zip(x.values)
  .sortBy(-_._2)
  .take(10)
  .map(_._1)
}
1
Why don't you use HashingTF instead of the CountVectorizer and set the number of features you want to keep ?eliasah
I need to be able to map back to the original word, so hashing isn't an option for me unfortunately.Ashley
You can keep those information with DataFrame if you use spark-ml. Unfortunate I have no access to a computer now to write the example code.eliasah
@eliasah What information should I keep in a DataFrame? If you are referring to the mapping of words to hash, I have a strict requirement for no collisions. My concern would still be how to sort the values of IDF though.Ashley
I understand your concern. @tuxda 's answer should solve your problem then.eliasah

1 Answers

3
votes

This might help:

scala> val x = (440,Array[Int](0,2,3,4,5,6,7,8,9,10,12,15,17,18,19,22,23,24,25,26,27,28,30,31,32,33,34,35,39,41,43,45,47,49,51,52,53,55,57,63,66,69,70,71,74,76,79,80,83,84,85,88,94,95,96,97,99,102,106,107,109,111,117,120,121,124,127,128,129,138,142,145,146,149,154,156,164,166,167,170,171,176,187,189,199,203,204,217,218,219,232,234,236,237,238,240,248,250,251,254,259,263,265,267,280,291,296,302,304,309,319,322,328,333,347,361,364,371,375,384,388,393,395,401,403,433,438,439),Array[Double](1.3559553712291716,3.9422868018213513,0.6369074622370692,7.795697904781566,3.153829441457081,0.0,5.519201522549892,0.3184537311185346,0.3184537311185346,1.3559553712291716,0.4519851237430572,0.4519851237430572,0.6061358035703155,1.0116009116784799,0.4519851237430572,0.7884573603642703,0.4519851237430572,2.0232018233569597,0.7884573603642703,8.523740461192126,0.6061358035703155,0.6061358035703155,0.6061358035703155,0.6061358035703155,0.7884573603642703,0.6061358035703155,0.6061358035703155,0.6061358035703155,0.7884573603642703,0.7884573603642703,1.0116009116784799,1.0116009116784799,2.0232018233569597,0.7884573603642703,0.7884573603642703,3.897848952390783,0.7884573603642703,0.7884573603642703,1.0116009116784799,5.114244276715276,1.0116009116784799,1.0116009116784799,2.5985659682605218,1.2992829841302609,1.2992829841302609,1.0116009116784799,1.0116009116784799,1.0116009116784799,1.0116009116784799,1.0116009116784799,2.5985659682605218,1.0116009116784799,1.2992829841302609,1.2992829841302609,1.2992829841302609,1.2992829841302609,1.2992829841302609,1.2992829841302609,1.2992829841302609,1.2992829841302609,3.4094961844768505,1.2992829841302609,1.2992829841302609,1.2992829841302609,1.2992829841302609,3.4094961844768505,1.2992829841302609,1.2992829841302609,1.2992829841302609,3.4094961844768505,1.2992829841302609,1.2992829841302609,1.2992829841302609,1.2992829841302609,1.2992829841302609,1.2992829841302609,1.2992829841302609,1.2992829841302609,1.2992829841302609,1.2992829841302609,1.2992829841302609,1.7047480922384253,1.7047480922384253,1.7047480922384253,1.7047480922384253,1.7047480922384253,1.7047480922384253,1.7047480922384253,1.7047480922384253,1.7047480922384253,1.7047480922384253,1.7047480922384253,1.7047480922384253,1.7047480922384253,1.7047480922384253,1.7047480922384253,1.7047480922384253,1.7047480922384253,1.7047480922384253,1.7047480922384253,1.7047480922384253,1.7047480922384253,1.7047480922384253,1.7047480922384253,1.7047480922384253,1.7047480922384253,1.7047480922384253,1.7047480922384253,1.7047480922384253,1.7047480922384253,1.7047480922384253,1.7047480922384253,1.7047480922384253,1.7047480922384253,1.7047480922384253,1.7047480922384253,1.7047480922384253,1.7047480922384253,1.7047480922384253,1.7047480922384253,1.7047480922384253,1.7047480922384253,1.7047480922384253,1.7047480922384253,1.7047480922384253,1.7047480922384253,1.7047480922384253,1.7047480922384253))

scala> val (r, indices, values) = x
r: Int = 440
indices: Array[Int] = Array(0, 2, 3, 4, 5, 6, 7, 8, 9, 10, 12, 15, 17, 18, 19, 22, 23, 24, 25, 26, 27, 28, 30, 31, 32, 33, 34, 35, 39, 41, 43, 45, 47, 49, 51, 52, 53, 55, 57, 63, 66, 69, 70, 71, 74, 76, 79, 80, 83, 84, 85, 88, 94, 95, 96, 97, 99, 102, 106, 107, 109, 111, 117, 120, 121, 124, 127, 128, 129, 138, 142, 145, 146, 149, 154, 156, 164, 166, 167, 170, 171, 176, 187, 189, 199, 203, 204, 217, 218, 219, 232, 234, 236, 237, 238, 240, 248, 250, 251, 254, 259, 263, 265, 267, 280, 291, 296, 302, 304, 309, 319, 322, 328, 333, 347, 361, 364, 371, 375, 384, 388, 393, 395, 401, 403, 433, 438, 439)
values: Array[Double] = Array(1.3559553712291716, 3.9422868018213513, 0.6369074622370692, 7.795697904781566, 3.153829441457081, 0.0, 5.519201522549892, 0.3184537311185346, 0.31845373...

scala> val topTermIds = indices.zip(values).sortBy( - _._2).take(50).map(_._1)
topTermIds: Array[Int] = Array(26, 4, 7, 63, 2, 52, 109, 124, 138, 5, 70, 85, 24, 47, 176, 187, 189, 199, 203, 204, 217, 218, 219, 232, 234, 236, 237, 238, 240, 248, 250, 251, 254, 259, 263, 265, 267, 280, 291, 296, 302, 304, 309, 319, 322, 328, 333, 347, 361, 364)

Now you need to plug in above code into a closure, something like:

val topTermsByScore = rdd.map { v: Vector =>
    // to sort decreasing use - 
    v.indices.zip(v.values).sortBy( - _._2).take(50).map(_._1)
}