I'm working on a algorithm that requires math operations on large matrix. Basically, the algorithm involves the following steps:
Inputs: two vectors u and v of size n
For each vector, compute pairwise Euclidean distance between elements in the vector. Return two matrix E_u and E_v
For each entry in the two matrices, apply a function f. Return two matrix M_u, M_v
Find the eigen values and eigen vectors of M_u. Return e_i, ev_i for i = 0,...,n-1
Compute the outer product for each eigen vector. Return a matrix O_i = e_i*transpose(e_i), i = 0,...,n-1
Adjust each eigen value with e_i = e_i + delta_i, where delta_i = sum all elements(elementwise product of O_i and M_v)/2*mu, where mu is a parameter
Final return a matrix A = elementwise sum (e_i * O_i) over i = 0,...,n-1
The issue I'm facing is mainly the memory when n is large (15000 or more), since all matrices here are dense matrices. My current way to implement this may not be the best, and partially worked.
I used a RowMatrix for M_u and get eigen decomposition using SVD.
The resulting U factor of SVD is a row matrix whose columns are ev_i's, so I have to manually transpose it so that its rows become ev_i. The resulting e vector is the eigen values e_i.
Since a previous attempt of directly mapping each row ev_i to O_i failed due to out of memory, I'm currently doing
R = U.map{
case(i,ev_i) => {
(i, ev_i.toArray.zipWithIndex)
}
}//add index for each element in a vector
.flatMapValues(x=>x)}
.join(U)//eigen vectors column is appended
.map{case(eigenVecId, ((vecElement,elementId), eigenVec))=>(elementId, (eigenVecId, vecElement*eigenVec))}
To compute adjusted e_i's in step 5 above, M_v is stored as rdd of tuples (i, denseVector). Then
deltaRdd = R.join(M_v)
.map{
case(j,((i,row_j_of_O_i),row_j_of_M_v))=>
(i,row_j_of_O_i.t*DenseVector(row_j_of_M_v.toArray)/(2*mu))
}.reduceByKey(_+_)
Finally, to compute A, again due to memory issue, I have to first joining rows from different rdds and then reducing by key. Specifically,
R_rearranged = R.map{case(j, (i, row_j_of_O_i))=>(i,(j,row_j_of_O_i))}
termsForA = R_rearranged.join(deltaRdd)
A = termsForA.map{
case(i,(j,row_j_of_O_i), delta_i)) => (j, (delta_i + e(i))*row_j_of_O_i)
}
.reduceByKey(_+_)
The above implementation worked to the step of termsForA, which means if I execute an action on termsForA like termsForA.take(1).foreach(println), it succeeded. But if I execute an action on A, like A.count(), an OOM error occured on driver.
I tried to tune sparks configuration to increase driver memory as well as parallelism level, but all failed.