1
votes

I'm working on a algorithm that requires math operations on large matrix. Basically, the algorithm involves the following steps:

Inputs: two vectors u and v of size n

  1. For each vector, compute pairwise Euclidean distance between elements in the vector. Return two matrix E_u and E_v

  2. For each entry in the two matrices, apply a function f. Return two matrix M_u, M_v

  3. Find the eigen values and eigen vectors of M_u. Return e_i, ev_i for i = 0,...,n-1

  4. Compute the outer product for each eigen vector. Return a matrix O_i = e_i*transpose(e_i), i = 0,...,n-1

  5. Adjust each eigen value with e_i = e_i + delta_i, where delta_i = sum all elements(elementwise product of O_i and M_v)/2*mu, where mu is a parameter

  6. Final return a matrix A = elementwise sum (e_i * O_i) over i = 0,...,n-1

The issue I'm facing is mainly the memory when n is large (15000 or more), since all matrices here are dense matrices. My current way to implement this may not be the best, and partially worked.

I used a RowMatrix for M_u and get eigen decomposition using SVD.

The resulting U factor of SVD is a row matrix whose columns are ev_i's, so I have to manually transpose it so that its rows become ev_i. The resulting e vector is the eigen values e_i.

Since a previous attempt of directly mapping each row ev_i to O_i failed due to out of memory, I'm currently doing

R = U.map{
    case(i,ev_i) => {
      (i, ev_i.toArray.zipWithIndex)
    }
  }//add index for each element in a vector
  .flatMapValues(x=>x)}
  .join(U)//eigen vectors column is appended
  .map{case(eigenVecId, ((vecElement,elementId), eigenVec))=>(elementId, (eigenVecId, vecElement*eigenVec))}

To compute adjusted e_i's in step 5 above, M_v is stored as rdd of tuples (i, denseVector). Then

deltaRdd = R.join(M_v)
  .map{
    case(j,((i,row_j_of_O_i),row_j_of_M_v))=>
    (i,row_j_of_O_i.t*DenseVector(row_j_of_M_v.toArray)/(2*mu))
  }.reduceByKey(_+_)

Finally, to compute A, again due to memory issue, I have to first joining rows from different rdds and then reducing by key. Specifically,

R_rearranged = R.map{case(j, (i, row_j_of_O_i))=>(i,(j,row_j_of_O_i))}
termsForA = R_rearranged.join(deltaRdd)
A = termsForA.map{
  case(i,(j,row_j_of_O_i), delta_i)) => (j, (delta_i + e(i))*row_j_of_O_i)
}
.reduceByKey(_+_)

The above implementation worked to the step of termsForA, which means if I execute an action on termsForA like termsForA.take(1).foreach(println), it succeeded. But if I execute an action on A, like A.count(), an OOM error occured on driver.

I tried to tune sparks configuration to increase driver memory as well as parallelism level, but all failed.

2

2 Answers

0
votes

Use IndexedRowMatrix instead of RowMatrix, it will help in conversions and transpose. Suppose your IndexedRowMatrix is Irm

svd = Irm.computeSVD(k, True)
U = svd.U
U =  U.toCoordinateMatrix().transpose().toIndexedRowMatrix()

You can convert Irm to BlockMatrix for multiplication with another distributed BlockMatrix.

0
votes

I guess at some point Spark decided there's no need to carry out operations on executors, and do all the work on driver. Actually, termsForA would fail as well in action like count. Somehow I made it work by broadcasting deltaRdd and e.