3
\$\begingroup\$

The Github repository NM-Heston solves call option prices under the Heston 2-factor model using ADI splitting schemes. I am adapting the code to price options under the 3-factor Heston-Hull-White model, also using ADI splitting schemes. While doing this, I became curious if it is possible to get rid of the nested for loops in the make_matrices() function, and instead create a vectorized version. Similar to my question posted here: Solving a 3D heat diffusion PDE.

The code within mat_factory.py is too long to post here and I have instead taken a snippet of the code, tweaked it, and added it below. I would like to understand how to convert it to a vectorized version. Once I understand, I will do it for the entire piece of code. The snippet is as follows:

import numpy as np

x = 5
y = 3

Vec_x = np.linspace(0, 1, x + 1)
Vec_y = np.linspace(0, 1, y + 1)

m = (x + 1) * (y + 1)
A_0 = np.zeros((m, m))

for i in range(1, x):
    for j in range(1, y):
        A_0[i + j * (x + 1), (i - 1) + (j + -1) * (x + 1)] += 2 * Vec_x[i] * Vec_y[j] * (Vec_x[i + 1] - Vec_x[i]) / (Vec_x[i] - Vec_x[i - 1]) / (Vec_x[i + 1] - Vec_x[i - 1]) * (Vec_y[j + 1] - Vec_y[j]) / (Vec_y[j] - Vec_y[j - 1]) / (Vec_y[j + 1] - Vec_y[j - 1])

print(sum(sum(A_0)))
>>15.0

I am therefore trying to get rid of the nested for loops in the code snippet above, and replace it with a vectorized version. I am doing this to see if I can cut down on execution time, while also shortening the code.

\$\endgroup\$
4
  • 3
    \$\begingroup\$ Please describe the high-level use case you're trying to solve, the problem that a colleague, perhaps a manager, would see value in. And then, at a lower level, can you describe that expression as a step in a convolution or other familiar expression? \$\endgroup\$
    – J_H
    Commented Jan 27, 2023 at 23:14
  • \$\begingroup\$ The code within mat_factory.py is too long to post here - no, it's not. It's a mere 96 lines, and you've omitted some crucial context, including that you do target sparse matrices; you just use the wrong entry point to a sparse representation. \$\endgroup\$
    – Reinderien
    Commented Jan 29, 2023 at 16:46
  • \$\begingroup\$ This is an interesting question and an interesting project. I recommend that you integrate the feedback you received, and then post a new question containing all of mat_factory.py. \$\endgroup\$
    – Reinderien
    Commented Jan 30, 2023 at 13:39
  • 1
    \$\begingroup\$ Hi @Reinderien. Thank you for your detailed answer. I have integrated your solution in my main code as best as possible. Would you please have a look at the following post as well, if it won't be any trouble? \$\endgroup\$
    – Ruan
    Commented Feb 5, 2023 at 12:40

1 Answer 1

5
\$\begingroup\$

Step 1: don't linspace your sample data. This allows for a certain category of transposition error. Instead just use random data for more confidence in the regression tests.

Format

Step 2: your formatting is completely impenetrable, and in its current state masks the actual structure of the problem. Reformat to:

y = 3
x = 5

rand = default_rng(seed=0)
vec_y = rand.random(y + 1)
vec_x = rand.random(x + 1)

m = (y + 1)*(x + 1)


def old() -> np.ndarray:
    A_0 = np.zeros((m, m))

    for i in range(1, y):
        for j in range(1, x):
            A_0[
                j + (i    )*(x + 1),
                j + (i - 1)*(x + 1) - 1,
            ] += (
                  vec_y[i] * (vec_y[i + 1] - vec_y[i])
                * vec_x[j] * (vec_x[j + 1] - vec_x[j])
                / (vec_y[i] - vec_y[i - 1]) / (vec_y[i + 1] - vec_y[i - 1])
                / (vec_x[j] - vec_x[j - 1]) / (vec_x[j + 1] - vec_x[j - 1])
            )

    return 2*A_0

While you do this, rearrange your loops and expressions to be in row-major order: \$y\$ and \$i\$ first, and by convention, \$i\$ being the vertical index.

Linear algebra

Now do your linear-algebraic analysis. Your problem has a high degree of symmetry - in fact, \$A_0\$ is symmetric about one diagonal, and all other diagonals are nothing but 0. This can be proven by the relation

$$ a_j = a_i - x - 2 $$

with \$a_i\$ and \$a_j\$ the indices into your matrix. Since \$a_i\$ has no duplicates and

$$ \frac {\partial a_i} {\partial a_j} = 1 $$

there must be unique indices on a single diagonal.

Now observe that there are two sub-expressions, one for each of \$\vec x\$ and \$\vec y\$, that are structurally identical, independent and evaluated in one dimension. This is due to your ADI construction. Get your offsets and define a common function via

def adi_shift(a: np.ndarray) -> np.ndarray:
    a0, a1, a2 = sliding_window_view(a, a.size - 2)
    return a1 * (a2 - a1) / (a1 - a0) / (a2 - a0)

In the main routine, the diagonal is now

2 * np.outer(adi_shift(vec_y), adi_shift(vec_x))

In other words, with \$\vec y\$ as a vertical vector and \$\vec x\$ as a horizontal vector, find the matrix product. This will not yet be in dimensions suitable for \$A_0\$.

In the main routine, define your indices not as scalars but as vectors, and do not add 1 to your lengths:

y = len(vec_y)
x = len(vec_x)
i = np.arange(1, y-1)
j = np.arange(1, x-1)
ai = np.add.outer(i*x, j)
aj = ai - (x + 1)

Create \$A_0\$ as you were, but do not add to it. One error in the original implementation is that the individual elements were treated as a sum when they need to be an assignment. Assign by index:

A_0[ai, aj] = 2 * np.outer(adi_shift(vec_y), adi_shift(vec_x))

Move all code into functions and add PEP484 typehints.

Suggested

All together:

from typing import Callable

import numpy as np
from numpy.lib.stride_tricks import sliding_window_view
from numpy.random import default_rng


def old(vec_y: np.ndarray, vec_x: np.ndarray) -> np.ndarray:
    y = len(vec_y) - 1
    x = len(vec_x) - 1
    m = (y + 1)*(x + 1)
    A_0 = np.zeros((m, m))

    for i in range(1, y):
        for j in range(1, x):
            A_0[
                j + (i    )*(x + 1),
                j + (i - 1)*(x + 1) - 1,
            ] += (
                  vec_y[i] * (vec_y[i + 1] - vec_y[i])
                * vec_x[j] * (vec_x[j + 1] - vec_x[j])
                / (vec_y[i] - vec_y[i - 1]) / (vec_y[i + 1] - vec_y[i - 1])
                / (vec_x[j] - vec_x[j - 1]) / (vec_x[j + 1] - vec_x[j - 1])
            )

    return 2*A_0


def adi_shift(a: np.ndarray) -> np.ndarray:
    a0, a1, a2 = sliding_window_view(a, a.size - 2)
    return a1 * (a2 - a1) / (a1 - a0) / (a2 - a0)


def new(vec_y: np.ndarray, vec_x: np.ndarray) -> np.ndarray:
    y = len(vec_y)
    x = len(vec_x)
    i = np.arange(1, y-1)
    j = np.arange(1, x-1)
    ai = np.add.outer(i*x, j)
    aj = ai - (x + 1)
    m = y*x
    A_0 = np.zeros((m, m))
    A_0[ai, aj] = 2 * np.outer(adi_shift(vec_y), adi_shift(vec_x))
    return A_0


def test(
    vec_y: np.ndarray,
    vec_x: np.ndarray,
    method: Callable[[np.ndarray, np.ndarray], np.ndarray],
) -> None:
    A_0 = method(vec_y, vec_x)
    assert 0 == np.count_nonzero(np.tril(A_0, k=-7 - 1))
    assert 0 == np.count_nonzero(np.triu(A_0, k=-7 + 1))
    assert np.allclose(
        np.diag(A_0, -7),
        [
            -9.88163813,  -0.60548181, -24.43299993,   3.96913142,
             0.        ,   0.        ,   3.14201806,   0.19252221,
             7.76884623,  -1.26204607,   0.        ,   0.        ,
             0.        ,   0.        ,   0.        ,   0.        ,
             0.        ,
        ]
    )


def main() -> None:
    rand = default_rng(seed=0)
    vec_x = rand.random(6)
    vec_y = rand.random(4)
    test(vec_y, vec_x, old)
    test(vec_y, vec_x, new)


if __name__ == '__main__':
    main()

True diagonal initialisation

One potential optimisation is to make an actual diagonal vector first, and then call diagflat; this is equivalent and quite easier:

def new(vec_y: np.ndarray, vec_x: np.ndarray) -> np.ndarray:
    y = len(vec_y)
    x = len(vec_x)
    diag = np.zeros((y-1, x))
    diag[:-1, :-2] = 2 * np.outer(adi_shift(vec_y), adi_shift(vec_x))
    return np.diagflat(diag.ravel()[:-1], k=-1 - x)

Sparse matrices

You have not offered any indication as to the true scale of your problem. Sparse matrices may be called-for. Read on scipy.sparse.diags. Past a certain point, \$A_0\$ is non-representable with a dense matrix, and only a sparse matrix is feasible:

from timeit import timeit
from typing import Callable

import numpy as np
import scipy.sparse
from numpy.lib.stride_tricks import sliding_window_view
from numpy.random import default_rng
from scipy.sparse import spmatrix


def old(vec_y: np.ndarray, vec_x: np.ndarray) -> np.ndarray:
    y = len(vec_y) - 1
    x = len(vec_x) - 1
    m = (y + 1)*(x + 1)
    A_0 = np.zeros((m, m))

    for i in range(1, y):
        for j in range(1, x):
            A_0[
                j + (i    )*(x + 1),
                j + (i - 1)*(x + 1) - 1,
            ] += (
                  vec_y[i] * (vec_y[i + 1] - vec_y[i])
                * vec_x[j] * (vec_x[j + 1] - vec_x[j])
                / (vec_y[i] - vec_y[i - 1]) / (vec_y[i + 1] - vec_y[i - 1])
                / (vec_x[j] - vec_x[j - 1]) / (vec_x[j + 1] - vec_x[j - 1])
            )

    return 2*A_0


def adi_shift(a: np.ndarray) -> np.ndarray:
    a0, a1, a2 = sliding_window_view(a, a.size - 2)
    return a1 * (a2 - a1) / (a1 - a0) / (a2 - a0)


def new(vec_y: np.ndarray, vec_x: np.ndarray) -> spmatrix:
    y = len(vec_y)
    x = len(vec_x)
    diag = np.zeros((y-1, x))
    diag[:-1, :-2] = 2 * np.outer(adi_shift(vec_y), adi_shift(vec_x))
    return scipy.sparse.diags(diag.ravel()[:-1], offsets=-1 - x)


def test(
    vec_y: np.ndarray,
    vec_x: np.ndarray,
    method: Callable[[np.ndarray, np.ndarray], np.ndarray],
) -> None:
    A_0 = method(vec_y, vec_x)
    if scipy.sparse.issparse(A_0):
        assert 0 == scipy.sparse.tril(A_0, k=-7 - 1).count_nonzero()
        assert 0 == scipy.sparse.triu(A_0, k=-7 + 1).count_nonzero()
        diag = A_0.diagonal(k=-7)
    else:
        assert 0 == np.count_nonzero(np.tril(A_0, k=-7 - 1))
        assert 0 == np.count_nonzero(np.triu(A_0, k=-7 + 1))
        diag = np.diag(A_0, -7)
    assert np.allclose(
        diag,
        [
            -9.88163813,  -0.60548181, -24.43299993,   3.96913142,
             0.        ,   0.        ,   3.14201806,   0.19252221,
             7.76884623,  -1.26204607,   0.        ,   0.        ,
             0.        ,   0.        ,   0.        ,   0.        ,
             0.        ,
        ]
    )


def main() -> None:
    rand = default_rng(seed=0)
    vec_x = rand.random(6)
    vec_y = rand.random(4)
    test(vec_y, vec_x, old)
    test(vec_y, vec_x, new)

    vec_y = rand.random(17)
    vec_x = rand.random(26)
    a0 = old(vec_y, vec_x)
    a1 = new(vec_y, vec_x)
    assert np.allclose(a0, a1.toarray())

    vec_y = rand.random(30_000)
    vec_x = rand.random(2_000)
    def run():
        return new(vec_y, vec_x)
    print(
        f'{len(vec_y)}x{len(vec_x)} '
        f'in {timeit(run, number=1):.4f}'
    )


if __name__ == '__main__':
    main()

Performance

Using a test routine of

def main() -> None:
    rand = default_rng(seed=0)
    vec_x = rand.random(6)
    vec_y = rand.random(4)
    test(vec_y, vec_x, old)
    test(vec_y, vec_x, new)

    vec_y = rand.random(17)
    vec_x = rand.random(26)
    a0 = old(vec_y, vec_x)
    a1 = new(vec_y, vec_x)
    assert np.allclose(a0, a1)

    vec_y = rand.random(250)
    vec_x = rand.random(150)
    def run():
        return new(vec_y, vec_x)
    print(
        f'{len(vec_y)}x{len(vec_x)} '
        f'in {timeit(run, number=1):.4f}'
    )

with the diagflat method, this executes quickly:

250x150 in 0.0691

I run out of memory before I run out of time.

With the sparse method this is vastly improved:

250x150 in 0.0005
30000x2000 in 0.5826

Comparing the original and sparse methods for size 200x100, there is a speedup multiple of about 270.

\$\endgroup\$
1
  • 1
    \$\begingroup\$ zomg, better living through math. Brilliant! \$\endgroup\$
    – J_H
    Commented Jan 28, 2023 at 17:59

Not the answer you're looking for? Browse other questions tagged or ask your own question.