1
votes

I am computing cosine similarity between two large sets of vectors (with the same features). Each set of vectors is represented as a scipy CSR sparse matrix, A and B. I want to compute A x B^T, which will not be sparse. However, I only need to keep track of values exceeding some threshold, e.g. 0.8. I am trying to implement this in Pyspark with vanilla RDDs, with the idea of using the fast vector operations implemented for scipy CSR matrices.

The rows of A and B are normalized, so to compute cosine similarity I simply need to find the dot product of each row from A with each row from B. The dimensions of A are 5,000,000 x 5,000. The dimensions of B are 2,000,000 x 5,000.

Suppose A and B are too large to fit into memory on my worker nodes as broadcast variables. How should I approach parallelizing both A and B in an optimal way?

EDIT After I posted my solution, I have been exploring other approaches which may be clearer and more optimal, particularly the columnSimilarities() function implemented for Spark MLlib IndexedRowMatrix objects. (Which pyspark abstraction is appropriate for my large matrix multiplication?)

1

1 Answers

1
votes

I was able to implement a solution in this framework.
Would welcome insight into why this solution is slow--is it the custom serialization?

def csr_mult_helper(pair):
    threshold=0.8
    A_row = pair[0][0]  # keep track of the row offset
    B_col = pair[1][0]   # offset for B (this will be a column index, after the transpose op)
    A = sparse.csr_matrix(pair[0][1], pair[0][2])  # non-zero entires, size data
    B = sparse.csr_matrix(pair[1][1], pair[1][2])

    C = A * B.T  # scipy sparse mat mul

    for row_idx, row in enumerate(C):  # I think it would be better to use a filter Transformation instead
        col_indices = row.indices      #  but I had trouble with the row and column index book keeping
        col_values = row.data

        for col_idx, val in zip(col_indices, col_values):
            if val > threshold:
                yield (A_row + row_idx, B_col + col_idx, val)  # source vector, target vector, cosine score            

def parallelize_sparse_csr(M, rows_per_chunk=1):
    [rows, cols] = M.shape
    i_row = 0
    submatrices = []
    while i_row < rows:
        current_chunk_size = min(rows_per_chunk, rows - i_row)
        submat = M[i_row:(i_row + current_chunk_size)]
        submatrices.append(   (i_row,                                #  offset
                              (submat.data, submat.indices, submat.indptr),  # sparse matrix data
                              (current_chunk_size, cols)) )      # sparse matrix shape
        i_row += current_chunk_size
    return sc.parallelize(submatrices)

########## generate test data ###########
K,L,M,N = 5,2000,3,2000  # matrix dimensions (toy example)
A_ = sparse.rand(K,L, density=0.1, format='csr')
B_ = sparse.rand(M,N, density=0.1, format='csr')
print("benchmark: {} \n".format((A_ * B_.T).todense()))  # benchmark solution for comparison

########## parallelize, multiply, and filter #########
t_start = time.time()
A = parallelize_sparse_csr(A_, rows_per_chunk=10)
B = parallelize_sparse_csr(B_, rows_per_chunk=10) # number of elements per partition, from B
            # warning: this code breaks if the B_ matrix rows_per_chunk parameter != 1
            # although I don't understand why yet

print("custom pyspark solution: ")
result = A.cartesian(B).flatMap(csr_mult_helper).collect()
print(results)

print("\n {} s elapsed".format(time.time() - t_start))