0
votes

I'm relatively new to PySpark and I'm currently trying to implement the SVD algorithm for predicting user-item ratings. The input is a matrix with columns - user_id, item_id and rating. In the first step I initialize the biases (bu, bi) and the factor matrices (pu, qi) for each user and each item. So I start the algorithm with the following dataframe:

Initial dataframe

In the current case the number of partitions is 7 and the time needed to count all the rows takes 0.7 seconds. The number of rows is 2.5 million.

Partitions and count time

In the next step I add a column to my dataframe - error. I use a UDF function which calculates the error for each row with regards to all the other columns (I don't think the equation is relevant). After the count function takes about the same amount of time.

Now comes the tricky part. I have to create 2 new dataframes. In the first I group together all the users (named userGroup) and in the second I group together all the items (named itemGroup). I have another UDF function implemented that updates the biases (update_b) and one that updates the factor matrices (update_factor_F). The userGroup dataframe has 1.5 million rows and the itemGroup has 72000 rows.

Updated biases and factors for each user

I then take the initial dataframe and join it firstly by user - I take the user_id, item_id and rating from the initial and the biases and factors from the userGroup dataframe. I repeat the same process with the itemGroup.

train = train.join(userGroup, train.u_id == userGroup.u_id_ug, 'outer') \
                .select(train.u_id, train.i_id, train.rating, userGroup.bu, userGroup.pu)

I end up having a dataframe with the same size as the initial one. However if I do a .count() it now takes around 8 seconds. I would have to repeat the above steps iteratively and each iteration slows the time to do the .count() action even further.

I know the issue lies in the join of the dataframes and have searched for solutions to my issues. So far I haver tried different combinations of partitioning (I used .repartition(7, "u_id") on the userGroup dataframe) to try and match the number of partitions. I also tried repartitioning the final dataframe, but the .count() remains high.

My goal is to not loose performance after each iteration.

1
HI gaspera and welcome to SO, please avoid using images as code. Put your dataframe as code for both, initial dataframe and expected dataframe. - Carmoreno

1 Answers

0
votes

As some of your dataframes can be used multiple times, you will want to cache them so that they are not re-evaluated every time you need them. To do this you can rely on cache() or persist() operations.

Also, the logical plan of your dataframe will grow as you move forward on your iterative algorithm. This will increase computations exponentially as you move forward on your iterations. To cope with this issue, you will need to rely on checkpoint() operation to regularly break the lineage of your dataframes.