
I am doing a text classification and I have built a model using the pipeline method. I have created the RF classifier object and have set the features column and the label column that I obtained in my previous steps (steps not shown).

I am fitting my training data which I have created using a dataframe and it has the columns "labels" and "sentences". The labels are different question types. The DF looks like,

training = sqlContext.createDataFrame([
("DESC:manner", "How did serfdom develop in and then leave Russia ?"),
("DESC:def", "What does '' extended definition '' mean and how would one a paper on it ? "),
("HUM:ind", " Who was The Pride of the Yankees ?")
], ["label", "sentence"])

The code for the pipeline is,

rf = RandomForestClassifier().setFeaturesCol("features").setLabelCol("idxlabel") 
pipeline = Pipeline(stages=[pos, tokenizer, hashingTF, idf, indexer,rf])
model = pipeline.fit(training)

So now I can get the predictions by using the following code,

prediction = model.transform(test)
selected = prediction.select("sentence","prediction")

I can do the select() operation to get the predicted labels.

But for my use case, there is a stream of data that is coming from Kinesis and it will be only sentences (plain strings). For each sentence, I have to predict the label. But now I am not finding any predict() function when I do dir(model). How come there is no predict() method for the RandomForestClassifier obtained from pyspark.ml? If not, how can I perform my use case successfully? I need the predict() method to satisfy the requirement. What ML algorithm should I use if not RF? Am I doing anything wrong? Can anyone please suggest something? Any help is appreciated. My environment is Spark 1.6 and Python 2.7.


2 Answers


So I figured it out that there is no predict() method that can be used. So instead, we need to use the transform() method to make predictions. Just remove the label column and create a new dataframe. For example, in my case, I did,

pred = sqlContext.createDataFrame([("What are liver enzymes ?" ,)], ["sentence"])

prediction = model.transform(pred)

And then we can find the prediction using the select() method. Atleast for now, this solution worked successfully for me. Please do let me know if there is any correction or a better approach than this.


I am also doing the same problem. Can you tell me what is "pos"(part of speech) in pipeline stage and how you are getting it. And also how are you preparing test data. Below is my code -

tokenizer = Tokenizer(inputCol="sentence", outputCol="words")
wordsData = tokenizer.transform(training)
hashingTF = HashingTF(inputCol="words", outputCol="rawFeatures", numFeatures=20)
featurizedData = hashingTF.transform(wordsData)
idf = IDF(inputCol="rawFeatures", outputCol="features")
indexer = StringIndexer(inputCol="label", outputCol="idxlabel")

rf = RandomForestClassifier().setFeaturesCol("features").setLabelCol("idxlabel") 
pipeline = Pipeline(stages=[tokenizer, hashingTF, idf, indexer, rf])
model = pipeline.fit(training)

Please tell me if I am doing anything wrong.