First pass
Add PEP484 type hints.
In your loops, you can pre-index some vectors for a minor speedup.
Prefer member .dot() over np.dot().
Prefer in-place add += rather than form x = x + ... for ndarray arguments.
Avoid native pow() in this context. np.square() is more appropriate.
Always set a known seed for predictable tests. Also: write tests! The tests I show give confidence that the results of matrix_factorization are exactly the same after refactoring.
A light refactor of your original code looks like
import numpy as np def matrix_factorization( R: np.ndarray, # rating matrix of a user with an item; 0 means prior P: np.ndarray, # score on latent features to predict R (will be modified) Q: np.ndarray, # score on latent features to predict R (will be modified) K: int, # number of extra features max_steps: int = 5_000, alpha: float = 2e-4, beta: float = 0.02, epsilon: float = 1e-3, ) -> tuple[ np.ndarray, # P np.ndarray, # Q ]: Q = Q.T for _ in range(max_steps): for i in range(len(R)): pi = P[i,:] ri = R[i,:] for j, rij in enumerate(ri): qj = Q[:,j] eij = 2*(rij - pi.dot(qj)) if rij > 0: for k in range(K): pi[k] += alpha*(eij*qj[k] - beta*pi[k]) qj[k] += alpha*(eij*pi[k] - beta*qj[k]) e = 0 for i in range(len(R)): pi = P[i,:] ri = R[i,:] for j, rij in enumerate(ri): qj = Q[:,j] if rij > 0: e += np.square(rij - pi.dot(qj)) + sum( 0.5*beta*(np.square(pi[k]) + np.square(qj[k])) for k in range(K) ) if e < epsilon: break return P, Q.T def test() -> None: R = np.array(( (5, 3, 0, 1), (4, 0, 0, 1), (1, 1, 0, 5), (1, 0, 0, 4), (0, 1, 5, 4), )) N, M = R.shape K = 2 rand = np.random.default_rng(seed=0) P = rand.random((N, K)) Q = rand.random((M, K)) nP, nQ = matrix_factorization(R, P, Q, K) assert np.allclose( ( ( 2.09499876, -0.47952543), ( 1.70134553, -0.29322251), ( 1.15021851, 2.01373816), ( 0.97941849, 1.58532621), ( 1.17893093, 1.51589598), ), nP, rtol=0, atol=1e-8, ) assert np.allclose( ( (2.20776869, -0.74542874), (1.34473580, -0.31621481), (2.03835722, 1.67576850), (0.92082345, 1.93850648), ), nQ, rtol=0, atol=1e-8, ) nR = nP @ nQ.T assert np.allclose( ( (4.98272469, 2.96885287, 3.46678224, 0.99956084), (3.97475387, 2.38058154, 2.97657691, 0.99822514), (1.0383181 , 0.90996617, 5.71911518, 4.96279265), (0.98058175, 0.81575547, 4.65304446, 3.97503663), (1.47281436, 1.10600186, 4.9433731 , 4.02416142), ), nR, rtol=0, atol=1e-8, ) priors = R != 0 assert np.allclose(R[priors], nR[priors], atol=0, rtol=0.1) if __name__ == '__main__': test()
Algorithm
The quuxlabs guide you reference is... not good. It's typical of "academic" code: it writes out loops element-by-element without using the libraries properly. Also, it has a very bad termination condition; for your example data it always runs to the iteration limit and the e tolerance termination never fires.
It's probably possible to salvage that implementation if it were written in C and the termination condition improved. I don't go down that path. Instead, I show a generic least-squares approach. The quality of this approach hinges on a good Jacobian definition, and thankfully that's straightforward since the system is linear (but still requires some tricky Kronecker products).
This approach is fast. It converges in five iterations for input the same size as your original example.
It works especially well if you use the support for sparse Jacobians through the linear least-squares minres aka. LSMR backend of lsmr.
Suggested implementation
This includes benchmarks and tests:
import functools import time import typing import numpy as np import pandas as pd import scipy.optimize import scipy.sparse as ssp from numpy.linalg import LinAlgError def unpack( decision: np.ndarray, mask_shape: tuple[int, int], ) -> tuple[ np.ndarray, np.ndarray, np.ndarray, # phat, qhat, rhat ]: """ Unpack the flat decision variable vector to P, Q, R matrices """ n, m = mask_shape k = decision.size//(m + n) phat, qhat = np.split(decision, (n*k,)) phat.resize((n, k)) qhat.resize((k, m)) return phat, qhat, phat @ qhat def residual( decision: np.ndarray, prior_mask: np.ndarray, alpha: float, beta: float, P: np.ndarray, Q: np.ndarray, Rprior: np.ndarray, ) -> np.ndarray: """ Given a decision vector, return the residual vector for those decisions. The residuals cover P error from original, Q error from original, and (only for nonzero R aka. the mask of prior elements) R error from original. """ phat, qhat, rhat = unpack(decision=decision, mask_shape=prior_mask.shape) eij = rhat[prior_mask] - Rprior return np.concat(( alpha*eij, beta*(phat - P).ravel(), beta*(qhat - Q).ravel(), )) def jacobian( decision: np.ndarray, prior_mask: np.ndarray, dpfactor: ssp.dia_array, dqfactor: ssp.dia_array, dpq_dpq: ssp.dia_array, dense: bool, ) -> np.ndarray: """ Produce Jacobian matrix for a decision vector. The first dimension covers flattened sizes for R residual, P residual and Q residual; the second dimension covers flattened sizes for P decision and Q decision. The R residual is only defined for prior elements. """ phat, qhat, rhat = unpack(decision=decision, mask_shape=prior_mask.shape) # Rerr by P+Q dr_dpq = ssp.hstack( blocks=(ssp.kron(dpfactor, qhat.T), ssp.kron(phat, dqfactor)), format='csr', )[prior_mask.ravel()] grad = ssp.vstack(blocks=(dr_dpq, dpq_dpq), format='csr') if dense: return grad.toarray() return grad def make_sparsity( prior_mask: np.ndarray, k: int, ) -> ssp.csr_array: n, m = prior_mask.shape dr_dpq = ssp.hstack( blocks=( ssp.kron(ssp.eye_array(n), np.ones((m, k))), ssp.kron(np.ones((n, k)), ssp.eye_array(m)), ), format='csr', )[prior_mask.ravel()] dpq_dpq = ssp.eye_array(n*k + k*m) return ssp.vstack(blocks=(dr_dpq, dpq_dpq), format='csr') def matrix_factorization( # scores on latent features to predict R P: np.ndarray, # strength of the associations between a user and the features Q: np.ndarray, # strength of the associations between an item and the features R: np.ndarray, # rating matrix of a user with an item; nonzero means prior # algorithm to use in least_squares method: typing.Literal['trf', 'dogbox', 'lm'] = 'trf', # trust region solver to use in least_squares. lsmr will pass a sparsity structure. tr_solver: None | typing.Literal['exact', 'lsmr'] = 'lsmr', loss: typing.Literal[ 'linear', 'soft_l1', 'huber', 'cauchy', 'arctan', ] = 'linear', # loss function for least_squares max_nfev: int | None = None, # maximum number of function evaluations in least_squares ftol: float = 1e-3, # termination tolerance in least_squares alpha: float = 0.9, # weight of R error beta: float = 0.1, # weight of P, Q error jac_tol: float | None = 1e-7, # if used, the allowed tolerance in a Jacobian sanity check ) -> tuple[ np.ndarray, np.ndarray, np.ndarray, # Phat, Qhat, Rhat scipy.optimize.OptimizeResult, # from least_squares ]: """ Find Phat, Qhat such that: Phat ~ P (n,k) Qhat ~ Q (k,m) Phat @ Qhat ~ R (n,m) for nonzero "prior" R """ prior_mask: np.ndarray = R != 0 n, k = P.shape k, m = Q.shape x0 = np.concatenate((P.ravel(), Q.ravel())) dpfactor = ssp.diags_array(np.full(shape=n, fill_value=alpha)) dqfactor = ssp.diags_array(np.full(shape=m, fill_value=alpha)) dpq_dpq = ssp.diags_array(np.full(shape=x0.size, fill_value=beta)) dense = method == 'lm' or tr_solver == 'exact' if dense: jac_sparsity: ssp.csr_array | None = None # not supported else: jac_sparsity = make_sparsity(prior_mask=prior_mask, k=k) fun = functools.partial( residual, prior_mask=prior_mask, P=P, Q=Q, Rprior=R[prior_mask], alpha=alpha, beta=beta, ) jac = functools.partial( jacobian, prior_mask=prior_mask, dense=dense, dpfactor=dpfactor, dqfactor=dqfactor, dpq_dpq=dpq_dpq, ) if jac_tol is not None: err = scipy.optimize.check_grad(func=fun, grad=jac, x0=x0) if err > jac_tol: raise ValueError('Gradient incorrect') result = scipy.optimize.least_squares( fun=fun, x0=x0, jac=jac, method=method, ftol=ftol, x_scale='jac', loss=loss, max_nfev=max_nfev, tr_solver=tr_solver, jac_sparsity=jac_sparsity, ) if not result.success: raise ValueError(result.message) return unpack(decision=result.x, mask_shape=R.shape) + (result,) def test() -> None: m = 4 n = 5 k = 2 rand = np.random.default_rng(seed=0) Phidden = rand.random(size=(n, k)) Qhidden = rand.random(size=(k, m)) Rhidden = Phidden @ Qhidden Ptrain = Phidden.round(1) Qtrain = Qhidden.round(1) Rtrain = Rhidden.round(2)*(( (1, 1, 0, 1), (1, 0, 0, 1), (1, 1, 0, 1), (1, 0, 0, 1), (0, 1, 1, 1), )) for method in ('trf', 'dogbox', 'lm'): for tr_solver in (None, 'exact', 'lsmr'): Phat, Qhat, Rhat, result = matrix_factorization( P=Ptrain, Q=Qtrain, R=Rtrain, method=method, tr_solver=tr_solver, ) assert np.allclose(Ptrain, Phat, rtol=0, atol=0.1) assert np.allclose(Qtrain, Qhat, rtol=0, atol=0.1) assert np.allclose(Rtrain[Rtrain != 0], Rhat[Rtrain != 0], rtol=0, atol=0.01) def benchmark() -> None: results = [] rand = np.random.default_rng(seed=0) for _ in range(3): # best of for n in (5, 10, 20, 40): m = int(0.8*n) for k_factor in (0.4, 0.8): k = int(k_factor*n) Phidden = rand.random(size=(n, k)) Qhidden = rand.random(size=(k, m)) Rhidden = Phidden @ Qhidden Ptrain = Phidden.round(1) Qtrain = Qhidden.round(1) for sparsity in (0.25, 0.75): Rtrain = Rhidden.round(2) * ( rand.random(size=Rhidden.shape) > sparsity ) for method in ('trf', 'dogbox', 'lm'): if method == 'lm': solvers = (None,) else: solvers = ('exact', 'lsmr') for tr_solver in solvers: if method == 'lm': losses = ('linear',) else: losses = ('linear', 'soft_l1', 'huber', 'cauchy', 'arctan') for loss in losses: row = ( sparsity, k_factor, n, method or 'None', tr_solver or 'None', loss, ) print(row) start = time.perf_counter() try: matrix_factorization( P=Ptrain, Q=Qtrain, R=Rtrain, method=method, tr_solver=tr_solver, loss=loss, jac_tol=None, ) dur = time.perf_counter() - start results.append(row + (dur,)) except LinAlgError: pass independents = ['sparsity', 'k_factor', 'n', 'method', 'tr_solver', 'loss'] df = pd.DataFrame( data=results, columns=independents + ['dur'], ) df = df.groupby(independents).min().sort_values([ 'sparsity', 'k_factor', 'n', 'dur', ]) df.reset_index().to_csv('benchmark.csv') pd.options.display.max_columns = 99 pd.options.display.max_rows = None pd.options.display.width = 200 print(df) if __name__ == '__main__': test() benchmark()
The output matrices of the test data look like
nfev = 5 njev = 5 P = [[0.6 0.3] [0. 0. ] [0.8 0.9] [0.6 0.7] [0.5 0.9]] ~ [[0.6166414 0.2883536 ] [0.04428404 0.01744651] [0.81438868 0.90631738] [0.60317056 0.73364438] [0.49604817 0.91790365]] Q = [[0.8 0. 0.9 0. ] [0.7 0.2 0.9 0.5]] ~ [[ 0.82958178 -0.00997023 0.89884808 0.01698354] [ 0.72185913 0.18847501 0.89786845 0.55756378]] R = [[0.72 0.05 0. 0.17] [0.05 0. 0. 0.01] [1.33 0.16 0. 0.52] [1.03 0. 0. 0.42] [0. 0.17 1.27 0.52]] ~ [[0.71970515 0.04819939 0.81317055 0.17124827] [0.04933115 0.00284671 0.05546929 0.01047964] [1.32983547 0.16269853 1.54576548 0.51916094] [1.0299672 0.13225988 1.20087485 0.4192975 ] [1.07410964 0.16805619 1.27002867 0.52021447]]
Benchmarks
When graphing the data via
import pandas as pd import matplotlib.pyplot as plt import seaborn df = pd.read_csv('benchmark.csv') df['tr_solver'] = df['tr_solver'].fillna('none') df['solver'] = df['method'] + '-' + df['tr_solver'] ul: plt.Axes ur: plt.Axes bl: plt.Axes br: plt.Axes fig, ((ul, ur), (bl, br)) = plt.subplots(nrows=2, ncols=2) for ax, ((k_factor, sparsity), group) in zip( (ul, ur, bl, br), df.groupby(['k_factor', 'sparsity']), ): seaborn.lineplot( data=group, ax=ax, x='n', y='dur', # style='loss', hue='solver', ) ax.set_xscale('log') ax.set_yscale('log') ax.set_title(f'k_factor={k_factor} sparsity={sparsity}') plt.show()
we see that LSMR is strongly preferred over multiple R sparsities and values of k:

sklearn.decomposition? \$\endgroup\$