4
votes

I have to solve a large amount of linear matrix equations of the type "Ax=B" for x where A is a sparse matrix with mainly the main diagonal populated and B is a vector.

My first approach was to use dense numpy arrays for this purpose with numpy.linalg.solve, and it works fine with a (N,n,n)-dimensional array with N being the number of linear matrix equations and n the square matrix dimension. I first used it with a for loop iterating through all equations, which in fact is rather slow. But then realized that you can also pass the (N,n,n)-dimensional matrix directly to numpy.linalg.solve without any for loop (which by the way I did not find in the documentation I read). This already gave a good increase in computation speed (details see below).

However, because I have sparse matrices, I also had a look at the scipy.sparse.linalg.spsolve function which does similar things like the corresponding numpy function. Using a for loop iterating through all equations is much, much faster than the numpy solution, but it seems impossible to pass the (N,n,n)-dimesional array directly to scipy´s spsolve.

Here is what I tried so far:

First, I calculate some fictional A matrices and B vectors with random numbers for test purposes, both sparse and dense:

import numpy as np
from scipy import sparse
from scipy.sparse.linalg import spsolve

number_of_systems = 100 #corresponds to N in the text
number_of_data_points = 1000 #corresponds to n in the text

#calculation of sample matrices (dense and sparse)
A_sparse = np.empty(number_of_systems,dtype=object)
A_dense = np.empty((number_of_systems,number_of_data_points,number_of_data_points))

for ii in np.arange(number_of_systems):
    A_sparse[ii] = sparse.spdiags(np.random.random(number_of_data_points),0,number_of_data_points,number_of_data_points)
    A_dense[ii] = A_sparse[ii].todense()

#calculation of sample vectors
B = np.random.random((number_of_systems,number_of_data_points))

1) First approach: numpy.linalg.solve with for loop:

def solve_dense_3D(A,B):
    results = np.empty((A.shape[0],A.shape[1]))
    for ii in np.arange(A.shape[0]):
        results[ii] = np.linalg.solve(A[ii],B[ii])
    return results

result_dense_for = solve_dense_3D(A_dense,B)

Timing:

timeit(solve_dense_3D(A_dense,B))
1.25 s ± 27.6 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)

2) Second approach: Passing the (N,n,n)-dimensional matrix directly to numpy.linalg.solve:

result_dense = np.linalg.solve(A_dense,B)

Timing:

timeit(np.linalg.solve(A_dense,B))
769 ms ± 9.68 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)

3) Third approach: Using scipy.sparse.linalg.spsolve with a for loop:

def solve_sparse_3D(A,B):
    results = np.empty((A.shape[0],A[0].shape[0]))
    for ii in np.arange(A.shape[0]):
        results[ii] = spsolve(A[ii],B[ii])
    return results

result_sparse_for = solve_sparse_3D(A_sparse,B)

Timing:

timeit(solve_sparse_3D(A_sparse,B))
30.9 ms ± 132 µs per loop (mean ± std. dev. of 7 runs, 10 loops each)

It is obvoius that there is an advantage being able to omit the for loop from approach 1 and 2. By far the fastest alternative is, as could probably be expected, approach 3 with sparse matrices.

Because the number of equations in this example is still rather low for me and because I have to do things like that very often, I would be happy if there was a solution using scipy´s sparse matrices without a for loop. Is anybody aware of a way to achieve that? Or maybe there is another way to solve the problem in an even different way? I would be happy for suggestions.

1
I'm not so sure if losing the loop will help (it's slow python-looping vs. indication of independence; where the latter might be more important in sparse operations). Without thinking too much of it, it feels, like you could just build a block-diag matrix of your independent problems (and growing b in sync of course). - sascha
sascha, I am not sure either. I hope that a similar effect can somehow be achieved like with the numpy approaches, which saves roughly 40% computation time when not using the for loop. Apart from that, I will have a look at your suggestion. - Snijderfrey
The loop itself shouldn't be a problem here, though numpy/scipy function overhead may be. Also, running your code I get a sparse efficiency warning. Turns out as it stands the sparse code spends more than half its time converting to csc/csr format. - Paul Panzer
Is this a valid test? The matrices are all diagonal, so that inversion is just element-wise division of the right side vector by the diagonal. spsolve recognizes this structure and does just that much operations. - Lutz Lehmann
LutzL, I get your point. In fact, my real data does not look quite as clean. Therefore, I repeated the methods with some real data (which are a little hard to show here) with qualitatively similar results. So I would assume that the test is more or less "valid". - Snijderfrey

1 Answers

1
votes

A small demo outlining the idea from my comment above:

""" YOUR CODE (only imports changed + deterministic randomness) """

import numpy as np
from scipy import sparse
from scipy.sparse.linalg import spsolve
from scipy.sparse import block_diag
from time import perf_counter as pc

np.random.seed(0)

number_of_systems = 100 #corresponds to N in the text
number_of_data_points = 1000 #corresponds to n in the text

#calculation of sample matrices (dense and sparse)
A_sparse = np.empty(number_of_systems,dtype=object)
A_dense = np.empty((number_of_systems,number_of_data_points,number_of_data_points))

for ii in np.arange(number_of_systems):
    A_sparse[ii] = sparse.spdiags(np.random.random(number_of_data_points),0,number_of_data_points,number_of_data_points)
    A_dense[ii] = A_sparse[ii].todense()

#calculation of sample vectors
B = np.random.random((number_of_systems,number_of_data_points))

def solve_sparse_3D(A,B):
    results = np.empty((A.shape[0],A[0].shape[0]))
    for ii in np.arange(A.shape[0]):
        results[ii] = spsolve(A[ii],B[ii])
    return results

start = pc()
result_sparse_for = solve_sparse_3D(A_sparse,B)
end = pc()
print(result_sparse_for)
print(end - start)

""" ALTERNATIVE APPROACH """

def solve_sparse_3D_blockdiag(A,B):
    oldN = B.shape[0]

    A_ = block_diag(A)    # huge sparse block-matrix of independent problems
    B_ = B.ravel()        # flattened vector

    results = spsolve(A_, B_)
    return results.reshape(oldN, -1)    # unflatten results

start = pc()
result_sparse_for = solve_sparse_3D_blockdiag(A_sparse,B)
end = pc()
print(result_sparse_for)
print(end - start)

which outputs

[[ 0.97529866  1.26406276  0.83348888 ...  0.99310639  3.90781207
0.16724226]
[ 1.23398934 28.82088739  1.6955886  ...  1.85011686  0.23386882
1.17208753]
[ 0.92864777  0.22248781  0.09445412 ...  2.5080376   0.91701228
0.97266564]
...
[ 0.33087093  0.89034736  1.7523883  ...  0.2171746   4.89236164
0.31546549]
[ 1.2163625   3.0100941   0.87216264 ...  1.62105596  0.33211353
2.07929302]
[ 5.35677404  1.23830776  0.16073721 ...  0.26492506  0.53676822
3.73192617]]
0.08764066299999995

###

[[ 0.97529866  1.26406276  0.83348888 ...  0.99310639  3.90781207
0.16724226]
[ 1.23398934 28.82088739  1.6955886  ...  1.85011686  0.23386882
1.17208753]
[ 0.92864777  0.22248781  0.09445412 ...  2.5080376   0.91701228
0.97266564]
...
[ 0.33087093  0.89034736  1.7523883  ...  0.2171746   4.89236164
0.31546549]
[ 1.2163625   3.0100941   0.87216264 ...  1.62105596  0.33211353
2.07929302]
[ 5.35677404  1.23830776  0.16073721 ...  0.26492506  0.53676822
3.73192617]]
0.07241856000000013

There are some things to do:

  • use your original more sane benchmarking-approach
  • build the block_diag in the correct sparse format to get rid of some potential warning and slowdown: see docs
  • tune spsolve's parameter permc_spec