Source code for mrinversion.linear_model.fista_lasso

import os

import csdmpy as cp
import matplotlib.pyplot as plt
import numpy as np

from mrinversion.linear_model._base_l1l2 import prepare_signal
from mrinversion.linear_model.fista import fista
from mrinversion.linear_model.fista import fista_cv

__author__ = "Deepansh Srivastava"
CPU_COUNTS = os.cpu_count()


[docs]class LassoFista: def __init__( self, lambda1=1e-3, max_iterations=50000, tolerance=5.2e-3, positive=True, inverse_dimension=None, ): self.hyperparameters = {"lambda": lambda1} self.max_iterations = max_iterations self.tolerance = tolerance self.positive = positive self.inverse_dimension = inverse_dimension
[docs] def fit(self, K, s, warm_start=False): s_, self.scale = prepare_signal(s) sin_val = np.linalg.svd(K, full_matrices=False)[1] K_, s_ = np.asfortranarray(K), np.asfortranarray(s_) self.f = np.asfortranarray(np.zeros((K_.shape[1], s_.shape[1]))) lipszit = sin_val[0] ** 2 if warm_start: self.f_1 = np.asfortranarray(np.zeros((K_.shape[1], 1))) _ = fista( matrix=K_, s=s_.mean(axis=1), lam=self.hyperparameters["lambda"], max_iter=self.max_iterations, f_k=self.f_1, nonnegative=int(self.positive), l_inv=(1 / lipszit), tol=self.tolerance, ) self.f = np.asfortranarray(np.tile(self.f_1, s_.shape[1])) _ = fista( matrix=K_, s=s_, lam=self.hyperparameters["lambda"], max_iter=self.max_iterations, f_k=self.f, nonnegative=int(self.positive), l_inv=(1 / lipszit), tol=self.tolerance, ) self.f *= self.scale if not isinstance(s, cp.CSDM): return # CSDM pack self.f = cp.as_csdm(np.squeeze(self.f.T)) app = self.inverse_dimension.application or {} if "com.github.deepanshs.mrinversion" in app: meta = app["com.github.deepanshs.mrinversion"] is_log = meta.get("log", False) if is_log: # unit = self.inverse_dimension.coordinates.unit coords = np.log10(self.inverse_dimension.coordinates.value) self.inverse_dimension = cp.as_dimension( array=coords, label=meta["label"] ) if len(s.dimensions) > 1: self.f.dimensions[1] = s.dimensions[1] self.f.dimensions[0] = self.inverse_dimension
[docs] def predict(self, K): r"""Predict the signal using the linear model. Args ---- K: ndarray A :math:`m \times n` kernel matrix, :math:`{\bf K}`. A numpy array of shape (m, n). Return ------ ndarray A numpy array of shape (m, m_count) with the predicted values """ f = self.f.y[0].components[0].T if isinstance(self.f, cp.CSDM) else self.f return np.dot(K, f)
[docs] def residuals(self, K, s): r"""Return the residual as the difference the data and the predicted data(fit), following .. math:: \text{residuals} = {\bf s - Kf^*} where :math:`{\bf f^*}` is the optimum solution. Args ---- K: ndarray. A :math:`m \times n` kernel matrix, :math:`{\bf K}`. A numpy array of shape (m, n). s: ndarray ot CSDM object. A csdm object or a :math:`m \times m_\text{count}` signal matrix, :math:`{\bf s}`. Return ------ ndarray or CSDM object. If `s` is a csdm object, returns a csdm object with the residuals. If `s` is a numpy array, return a :math:`m \times m_\text{count}` residue matrix. csdm object """ s_ = s.y[0].components[0].T if isinstance(s, cp.CSDM) else s predict = np.squeeze(self.predict(K)) residue = s_ - predict if not isinstance(s, cp.CSDM): return residue residue = cp.as_csdm(residue.T.copy()) residue._dimensions = s._dimensions return residue
[docs]class LassoFistaCV: def __init__( self, lambdas, folds=10, max_iterations=50000, tolerance=5.2e-3, positive=True, sigma=0.0, randomize=False, times=1, inverse_dimension=None, n_jobs=CPU_COUNTS, ): self.cv_lambdas = lambdas self.folds = folds self.max_iterations = max_iterations self.tolerance = tolerance self.positive = positive self.sigma = sigma self.randomize = randomize self.times = times self.inverse_dimension = inverse_dimension self.n_jobs = n_jobs # self.warm_start = warm_start self.hyperparameters = {} self.f = None
[docs] def fit(self, K, s): r"""Fit the model using the coordinate descent method from scikit-learn for all alpha anf lambda values using the `n`-folds cross-validation technique. The cross-validation metric is the mean squared error. Args: K: A :math:`m \times n` kernel matrix, :math:`{\bf K}`. A numpy array of shape (m, n). s: A :math:`m \times m_\text{count}` signal matrix, :math:`{\bf s}` as a csdm object or a numpy array or shape (m, m_count). """ s_, self.scale = prepare_signal(s) sin_val = np.linalg.svd(K, full_matrices=False)[1] K_, s_ = np.asfortranarray(K), np.asfortranarray(s_) lipszit = sin_val[0] ** 2 # test train CV set k_train, s_train, k_test, s_test, m = test_train_set( K_, s_, self.folds, self.randomize, self.times ) self.cv_map, self.std, self.prediction_error, _ = fista_cv( matrix=k_train, s=s_train, matrix_test=k_test, s_test=s_test, lambda_vals=self.cv_lambdas, max_iter=self.max_iterations, nonnegative=int(self.positive), l_inv=(1 / lipszit), tol=self.tolerance, ) # subtract the variance. self.cv_map -= (self.sigma / self.scale) ** 2 self.cv_map = np.abs(self.cv_map) lambdas = np.log10(self.cv_lambdas) l1_index, l2_index = calculate_opt_lambda(self.cv_map, self.std) lambda1, _ = lambdas[l1_index], lambdas[l2_index] self.hyperparameters["lambda"] = 10**lambda1 # Calculate the solution using the complete data at the optimized lambda self.opt = LassoFista( lambda1=self.hyperparameters["lambda"], max_iterations=self.max_iterations, tolerance=self.tolerance, positive=self.positive, inverse_dimension=self.inverse_dimension, ) self.opt.fit(K, s) self.f = self.opt.f # convert cv_map to csdm self.cv_map = cp.as_csdm(np.squeeze(self.cv_map.T.copy())) self.cv_map.y[0].component_labels = ["log10"] d0 = cp.as_dimension(np.log10(self.cv_lambdas), label="log(λ)") self.cv_map.dimensions[0] = d0 self.cross_validation_curve = self.cv_map
[docs] def predict(self, K): r"""Predict the signal using the linear model. Args ---- K: ndarray A :math:`m \times n` kernel matrix, :math:`{\bf K}`. A numpy array of shape (m, n). Return ------ ndarray A numpy array of shape (m, m_count) with the predicted values """ return self.opt.predict(K)
[docs] def residuals(self, K, s): r"""Return the residual as the difference the data and the predicted data(fit), following .. math:: \text{residuals} = {\bf s - Kf^*} where :math:`{\bf f^*}` is the optimum solution. Args ---- K: ndarray. A :math:`m \times n` kernel matrix, :math:`{\bf K}`. A numpy array of shape (m, n). s: ndarray ot CSDM object. A csdm object or a :math:`m \times m_\text{count}` signal matrix, :math:`{\bf s}`. Return ------ ndarray or CSDM object. If `s` is a csdm object, returns a csdm object with the residuals. If `s` is a numpy array, return a :math:`m \times m_\text{count}` residue matrix. csdm object """ return self.opt.residuals(K, s)
[docs] def cv_plot(self): """Plot the CV plot""" cv = self.cv_map.y[0].components[0] prediction_error = self.prediction_error std = self.std l1_idx, l2_idx = calculate_opt_lambda(cv, std) lambdas = np.log10(self.cv_lambdas) # opt_lambda = 0.5 * (lambdas[l1_idx] + lambdas[l2_idx]) plt.axhline(y=std[l1_idx] + cv[l1_idx], linestyle="--", c="r") plt.plot(lambdas, prediction_error, alpha=0.5, linestyle="dotted") kwargs = {"s": 70, "edgecolors": "k", "linewidth": 1.5} plt.scatter(lambdas[l1_idx], cv[l1_idx], facecolors="b", **kwargs) # plt.scatter(lambdas[l2_idx], cv[l2_idx], facecolors="r", **kwargs) # plt.axvline(x=opt_lambda, linestyle="--", c="g", label="$\\lambda^*$") plt.plot(lambdas, cv, c="k", alpha=1, label="CV curve") plt.yscale("log") # plt.errorbar(lambdas, cv, std, c='k', alpha=0.2) # plt.ylim([0, cv.max()]) plt.legend() plt.xlabel(r"log10 ($\lambda$)") plt.ylabel("test error")
def calculate_opt_lambda(cv, std): l1_index = np.unravel_index(cv.argmin(), cv.shape)[0] cv_std = cv[l1_index] + std[l1_index] temp = np.where(cv < cv_std)[0] if temp.size != 0: index = np.where(temp > l1_index)[0] if index.size != 0: index = index.max() l2_index = temp[index] return l1_index, l2_index return l1_index, l1_index def test_train_set(X, y, folds, random=False, repeat_folds=1): """Generate test and training sets""" index = np.arange(X.shape[0]) test_size = int(index.size / folds) m = index.size % folds train_size = index.size - test_size shape_k_train = (train_size, X.shape[1], folds * repeat_folds) k_train = np.zeros(shape_k_train) shape_s_train = (train_size, y.shape[1], folds * repeat_folds) s_train = np.zeros(shape_s_train) shape_k_test = (test_size + 1, X.shape[1], folds * repeat_folds) k_test = np.zeros(shape_k_test) shape_s_test = (test_size + 1, y.shape[1], folds * repeat_folds) s_test = np.zeros(shape_s_test) for j in range(repeat_folds): if random: np.random.shuffle(index) for i in range(folds): if random: if i < m: test_index = index[i * (test_size + 1) : (i + 1) * (test_size + 1)] set_index = np.arange( i * (test_size + 1), (i + 1) * (test_size + 1) ) else: test_index = index[i * test_size + m : (i + 1) * test_size + m] set_index = np.arange(i * test_size + m, (i + 1) * test_size + m) train_index = np.delete(index, set_index) else: if i < m: test_index = index[i:None:folds][: test_size + 1] else: test_index = index[i:None:folds][:test_size] train_index = np.delete(index, test_index) k_train[: train_index.size, :, j * folds + i] = X[train_index] s_train[: train_index.size, :, j * folds + i] = y[train_index] k_test[: test_index.size, :, j * folds + i] = X[test_index] s_test[: test_index.size, :, j * folds + i] = y[test_index] return k_train, s_train, k_test, s_test, m