Source code for suppy.feasibility._bands._ams_extrapolations

from abc import ABC
from typing import List
import numpy as np
import numpy.typing as npt
from scipy import sparse

try:
    import cupy as cp

    NO_GPU = False

except ImportError:
    NO_GPU = True
    cp = np


from suppy.feasibility._bands._ams_algorithms import SimultaneousAMSHyperslab


[docs] class ExtrapolatedLandweberHyperslab(SimultaneousAMSHyperslab): """ Extrapolated Landweber algorithm for solving linear inequalities of the form lb <= Ax <= ub. Parameters ---------- A : npt.NDArray or sparse.sparray Matrix for linear systems lb : npt.NDArray The lower bounds for the hyperslab. ub : npt.NDArray The upper bounds for the hyperslab. weights : None or List[float], optional The weights for the constraints, by default None. relaxation : float, optional The outer relaxation parameter, by default 1.0. proximity_flag : bool, optional A flag indicating whether to use proximity measures, by default True. References ---------- - [1] https://doi.org/10.1007/s11075-025-02163-0 - [2] https://doi.org/10.1007/978-3-642-30901-4 """ def __init__( self, A: npt.NDArray | sparse.sparray, lb: npt.NDArray, ub: npt.NDArray, relaxation: float = 1.0, weights: None | List[float] | npt.NDArray = None, proximity_flag: bool = True, ): super().__init__(A, lb, ub, 1.0, relaxation, weights, proximity_flag) self.a_i = self.A.row_norm(2, 2) # To avoid division by zero self.weight_norm = self.weights / self.a_i self.weight_norm[self.a_i == 0] = 0 # TODO: Division-by-zero guard above produces NaNs once all other criteria are met def _project(self, x: npt.NDArray) -> np.ndarray: xp = cp if self._use_gpu else np p = self.map(x) (res_l, res_u) = self.bounds.residual(p) d_idx = res_u < 0 c_idx = res_l < 0 if not (xp.any(d_idx)) and not (xp.any(c_idx)): return x t_u = self.weight_norm[d_idx] * res_u[d_idx] t_l = self.weight_norm[c_idx] * res_l[c_idx] t_u_2 = t_u @ self.A[d_idx, :] t_l_2 = t_l @ self.A[c_idx, :] sig = ((res_l[c_idx] @ t_l) + (res_u[d_idx] @ t_u)) / ((t_u_2 - t_l_2) @ (t_u_2 - t_l_2)) x += sig * (t_u_2 - t_l_2) if xp.isnan(x).any(): raise ValueError("NaN encountered in Extrapolated Landweber Hyperslab projection.") return x
[docs] class BlockIterativeExtrapolatedLandweberHyperslab(ExtrapolatedLandweberHyperslab): """ Block-iterative variant of the Extrapolated Landweber algorithm for hyperslabs, solving lb <= Ax <= ub block by block. Parameters ---------- A : npt.NDArray or sparse.sparray Matrix for linear systems. lb : npt.NDArray The lower bounds for the hyperslab. ub : npt.NDArray The upper bounds for the hyperslab. length_blocks : List[int] Number of rows in each block. Must sum to the total number of rows in A. relaxation : float, optional The outer relaxation parameter, by default 1.0. weights : None or List[float], optional The weights for the constraints, by default None. proximity_flag : bool, optional A flag indicating whether to use proximity measures, by default True. """ def __init__( self, A: npt.NDArray | sparse.sparray, lb: npt.NDArray, ub: npt.NDArray, length_blocks: List[int], relaxation: float = 1.0, weights: None | List[float] | npt.NDArray = None, proximity_flag: bool = True, ): super().__init__(A, lb, ub, relaxation, weights, proximity_flag) self.length_blocks = length_blocks self.As = [] self.ls = [] self.us = [] self.block_bounds = [] self.weight_norms = [] _lower_bound = 0 for i in range(len(self.length_blocks)): self.As.append(A[_lower_bound : _lower_bound + self.length_blocks[i]]) self.ls.append(lb[_lower_bound : _lower_bound + self.length_blocks[i]]) self.us.append(ub[_lower_bound : _lower_bound + self.length_blocks[i]]) self.weight_norms.append( self.weight_norm[_lower_bound : _lower_bound + self.length_blocks[i]] ) _lower_bound += self.length_blocks[i] def _project(self, x: npt.NDArray) -> np.ndarray: xp = cp if self._use_gpu else np for i, A_block in enumerate(self.As): p = A_block @ x res_l, res_u = p - self.ls[i], self.us[i] - p d_idx = res_u < 0 c_idx = res_l < 0 if not (xp.any(d_idx)) and not (xp.any(c_idx)): continue t_u = self.weight_norms[i][d_idx] * res_u[d_idx] t_l = self.weight_norms[i][c_idx] * res_l[c_idx] t_u_2 = t_u @ A_block[d_idx, :] t_l_2 = t_l @ A_block[c_idx, :] sig = ((res_l[c_idx] @ t_l) + (res_u[d_idx] @ t_u)) / ( (t_u_2 - t_l_2) @ (t_u_2 - t_l_2) ) x += sig * (t_u_2 - t_l_2) return x
[docs] class AdaptiveStepLandweberHyperslab(SimultaneousAMSHyperslab): """ Adaptive step-size Landweber algorithm for solving linear inequalities of the form lb <= Ax <= ub. Parameters ---------- A : npt.NDArray or sparse.sparray Matrix for linear systems lb : npt.NDArray The lower bounds for the hyperslab. ub : npt.NDArray The upper bounds for the hyperslab. weights : None or List[float], optional The weights for the constraints, by default None. relaxation : float, optional The outer relaxation parameter, by default 1.0. proximity_flag : bool, optional A flag indicating whether to use proximity measures, by default True. References ---------- - [1] https://doi.org/10.1515/jiip-2015-0082 """ def __init__( self, A: npt.NDArray | sparse.sparray, lb: npt.NDArray, ub: npt.NDArray, weights: None | List[float] | npt.NDArray = None, relaxation: float = 1.0, proximity_flag: bool = True, ): super().__init__(A, lb, ub, 1.0, relaxation, weights, proximity_flag) def _project(self, x: npt.NDArray) -> np.ndarray: xp = cp if self._use_gpu else np p = self.map(x) (res_l, res_u) = self.bounds.residual(p) d_idx = res_u < 0 c_idx = res_l < 0 if not (xp.any(d_idx)) and not (xp.any(c_idx)): return x u_t_u = (self.weights[d_idx] * res_u[d_idx]) @ self.A[d_idx, :] u_t_l = (self.weights[c_idx] * res_l[c_idx]) @ self.A[c_idx, :] u_diff = u_t_u - u_t_l Au_t = self.A @ u_diff sig = (u_diff @ u_diff) / (Au_t @ (self.weights * Au_t)) x += sig * u_diff return x
class AdaptiveStepLandweberHyperslab2(SimultaneousAMSHyperslab): """ Adaptive step-size Landweber algorithm (variant 2) for solving linear inequalities of the form lb <= Ax <= ub. Parameters ---------- A : npt.NDArray or sparse.sparray Matrix for linear systems lb : npt.NDArray The lower bounds for the hyperslab. ub : npt.NDArray The upper bounds for the hyperslab. relaxation : float, optional The outer relaxation parameter, by default 1.0. weights : None or List[float], optional The weights for the constraints, by default None. proximity_flag : bool, optional A flag indicating whether to use proximity measures, by default True. References ---------- - [1] https://doi.org/10.1515/jiip-2015-0082 """ def __init__( self, A: npt.NDArray | sparse.sparray, lb: npt.NDArray, ub: npt.NDArray, relaxation: float = 1.0, weights: None | List[float] | npt.NDArray = None, proximity_flag: bool = True, ): super().__init__(A, lb, ub, 1.0, relaxation, weights, proximity_flag) def _project(self, x: npt.NDArray) -> np.ndarray: xp = cp if self._use_gpu else np p = self.map(x) (res_l, res_u) = self.bounds.residual(p) d_idx = res_u < 0 c_idx = res_l < 0 res_u[~d_idx] = 0 res_l[~c_idx] = 0 if not (xp.any(d_idx)) and not (xp.any(c_idx)): return x idx = d_idx | c_idx # Note: weights[idx] has shape (k,); result of @ has shape (n,). # This formula is designed for cases where the weighted combination is applied element-wise. u_diff = self.weights[idx] * (res_u[idx] - res_l[idx]) @ self.A[idx, :] Au_t = self.A @ u_diff sig = (u_diff @ u_diff) / (Au_t @ (self.weights * Au_t)) x += sig * u_diff return x