I'm relatively new to PySpark and I'm currently trying to implement the SVD algorithm for predicting user-item ratings. The input is a matrix with columns - user_id, item_id and rating. In the first step I initialize the biases (bu, bi) and the factor matrices (pu, qi) for each user and each item. So I start the algorithm with the following dataframe:
In the current case the number of partitions is 7 and the time needed to count all the rows takes 0.7 seconds. The number of rows is 2.5 million.
In the next step I add a column to my dataframe - error. I use a UDF function which calculates the error for each row with regards to all the other columns (I don't think the equation is relevant). After the count function takes about the same amount of time.
Now comes the tricky part. I have to create 2 new dataframes. In the first I group together all the users (named userGroup) and in the second I group together all the items (named itemGroup). I have another UDF function implemented that updates the biases (update_b) and one that updates the factor matrices (update_factor_F). The userGroup dataframe has 1.5 million rows and the itemGroup has 72000 rows.
Updated biases and factors for each user
I then take the initial dataframe and join it firstly by user - I take the user_id, item_id and rating from the initial and the biases and factors from the userGroup dataframe. I repeat the same process with the itemGroup.
train = train.join(userGroup, train.u_id == userGroup.u_id_ug, 'outer') \
.select(train.u_id, train.i_id, train.rating, userGroup.bu, userGroup.pu)
I end up having a dataframe with the same size as the initial one. However if I do a .count() it now takes around 8 seconds. I would have to repeat the above steps iteratively and each iteration slows the time to do the .count() action even further.
I know the issue lies in the join of the dataframes and have searched for solutions to my issues. So far I haver tried different combinations of partitioning (I used .repartition(7, "u_id") on the userGroup dataframe) to try and match the number of partitions. I also tried repartitioning the final dataframe, but the .count() remains high.
My goal is to not loose performance after each iteration.