1
votes

I have been experimenting with spark and mllib to train a word2vec model but I don't seem to be getting the performance benefits of distributed machine learning on large datasets. My understanding is that if I have w workers, then, if I create an RDD with n number of partitions where n>w and I try to create a Word2Vec Model by calling the fit function of Word2Vec with the RDD as parameter then spark would distribute the data uniformly to train separate word2vec models on these w workers and use some sort of a reducer function at the end to create a single output model from these w models. This would reduce the computation time as rather than 1 chunk, w chunks of data will be processed simultaneously. The trade-off would be that some loss of precision might happen depending upon the reducer function used at the end. Does Word2Vec in Spark actually work this way or not? I might need to play with the configurable parameters if this is indeed the case.

EDIT

Adding the reason behind asking this question. I ran java spark word2vec code on 10 worker machines and set suitable values for executor-memory, driver memory and num-executors, after going though the documentation, for a 2.5gb input text file which was mapped to rdd partitions which were then used as training data for an mllib word2vec model. The training part took multiple hours. The number of worker nodes doesn't seem to be having much of an effect on the training time. The same code runs successfully on smaller data files (of the order of 10s of MBs)

Code

SparkConf conf = new SparkConf().setAppName("SampleWord2Vec");
conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer");
conf.registerKryoClasses(new Class[]{String.class, List.class});
JavaSparkContext jsc = new JavaSparkContext(conf);
JavaRDD<List<String>> jrdd = jsc.textFile(inputFile, 3).map(new Function<String, List<String>>(){            
        @Override
        public List<String> call(String s) throws Exception {
            return Arrays.asList(s.split(","));
        }        
});
jrdd.persist(StorageLevel.MEMORY_AND_DISK());
Word2Vec word2Vec = new Word2Vec()
      .setWindowSize(20)
      .setMinCount(20);

Word2VecModel model = word2Vec.fit(jrdd);
jrdd.unpersist(false);
model.save(jsc.sc(), outputfile);
jsc.stop();
jsc.close();
2
It would help if you shared your code and some more details about how you ran your spark-submit. When you're running do you see all of your workers active all the time? Spark history UI will let you dig in. There's a chance that your code is not performant and you're not fully distributing your code. Spark ML includes JavaWord2Vec which is based on the data frames API. This should be very fast.tadamhicks
Is the spark ml JavaWord2Vec(dataframes api) supposed to be better than the mllib version (javardd api). I had abandoned the spark ml version because it was giving some compilation errors when I tried to iterate over the model vectors.Kabutops
The catalyst optimizer behind the data frames API is way more performant and should be easier. You wouldn't iterate, that is a horrible bad way to use Spark. ML lets you build pipelines that essentially perform a functional map on all values for the column you select. Again, code would help.tadamhicks
I have updated the question with the problematic portion. I had removed the part where I iterate over the model vectors but the model training step is taking too much time. The logs print the value of alpha as it goes down from 0.025 and it proceeds very slowly.Kabutops

2 Answers

4
votes

Judging from the comments, answers and downvotes I guess I wasn't able to frame my question correctly. But the answer to what I wanted to know is yes, it is possible to train your word2vec model in parallel on spark. The pull request for this feature was created long time back:

https://github.com/apache/spark/pull/1719

In java, there is a setter method (setNumPartitions) for the Word2Vec object in spark mllib. This allows you to train your word2vec model on more than one executor in parallel. As per the comments on the pull request mentioned above:

"To make our implementation more scalable, we train each partition separately and merge the model of each partition after each iteration. To make the model more accurate, multiple iterations may be needed."

Hope this helps someone.

0
votes

I don't see anything inherently wrong with your code. I would highly recommend you consider the data frames API, however. As an example, here's a little chart that is frequently thrown around:

enter image description here

Also, I don't know how you may have been "iterating" over elements of the data frame (that's not really how they work). Here's an example from the Spark online docs:

enter image description here

You have the general idea...but you have to parallelize your data as a data frame, first. It is quite trivial to translate your javardd to a DataFrame instead.

DataFrame fileDF = sqlContext.createDataFrame(jrdd, Model.class);

Spark runs a Directed Acyclic Graph (DAG) in lieu of MR, but the concept is the same. Running 'fit() on your data will indeed run across the data on the workers and then reduce to a single model. But this model will be itself distributed in memory until you decide to write it down.

But, as a trial, how long would it take you to run the same file through say NLTK or Word2Vec's native C++ binary?

One last thought...is there a reason you are persisting to memory AND disk? Spark has a native .cache() that persists to memory by default. The power of Spark is to do machine learning on data held in memory...BIG data in memory. If you persist to disk, even with kryo you are creating a bottleneck at disk I/O. IMHO the first thing to try would be to get rid of this and persist just to memory. If performance improves, great, but you will find leaps and bounds of performance by leaning on the power of Catalyst through DataFrames.

One thing we didn't discuss is your cluster. It would be helpful to think about things like how much memory per node you have...how many cores per node...is your cluster virtualized with other apps that are asking for resources (over provisioned like most vHosts)... Is your cluster in the cloud? Shared or dedicated?

Have you looked at Spark's UI to analyze the runtime operations of the code? What do you see when you run top on the workers while the model is fitting? Can you see full CPU utilization? Have you tried specifying --executor-cores to make sure you making full use of CPU?

I've seen it happen many times that all the work is being done on one core on one worker node. It would be helpful to have this info.

When troubleshooting performance, there are many places to look, including the Spark config files themselves!