Source code for suppy.feasibility._halfspaces._ams_algorithms

import warnings
from abc import ABC
from typing import List
import numpy as np
import numpy.typing as npt

try:
    import cupy as cp

    NO_GPU = False

except ImportError:
    NO_GPU = True
    cp = np

from suppy.feasibility._linear_algorithms import HalfspaceFeasibility


class HalfspaceAlgorithm(HalfspaceFeasibility, ABC):
    """
    The HalfspaceAlgorithm class is used to find a feasible solution to a
    set of linear inequalities.

    Parameters
    ----------
    A : npt.NDArray or sparse.sparray
        Matrix for linear systems
    b : npt.NDArray
        Bound for linear systems
    algorithmic_relaxation : npt.NDArray or float, optional
        The relaxation parameter used by the algorithm, by default 1.0.
    relaxation : float, optional
        Outer relaxation parameter, applied to the entire solution of the iterate by default 1.0.
    proximity_flag : bool, optional
        A flag indicating whether to use proximity in the algorithm, by default True.
    """

    def __init__(
        self,
        A: npt.NDArray,
        b: npt.NDArray,
        algorithmic_relaxation: npt.NDArray | float = 1.0,
        relaxation: float = 1.0,
        proximity_flag: bool = True,
    ):
        super().__init__(A, b, algorithmic_relaxation, relaxation, proximity_flag)


[docs] class SequentialAMSHalfspace(HalfspaceAlgorithm): """ SequentialAMS class for sequentially applying the AMS algorithm. Parameters ---------- A : npt.NDArray or sparse.sparray Matrix for linear systems b : npt.NDArray Bound for linear systems algorithmic_relaxation : npt.NDArray or float, optional The relaxation parameter used by the algorithm, by default 1.0. relaxation : float, optional Outer relaxation parameter, applied to the entire solution of the iterate by default 1.0. cs : None or List[int], optional The list of indices for the constraints, by default None. proximity_flag : bool, optional Flag to indicate if proximity should be considered, by default True. """ def __init__( self, A: npt.NDArray, b: npt.NDArray, algorithmic_relaxation: npt.NDArray | float = 1.0, relaxation: float = 1.0, cs: None | List[int] = None, proximity_flag: bool = True, ): super().__init__(A, b, algorithmic_relaxation, relaxation, proximity_flag) xp = cp if self._use_gpu else np if cs is None: self.cs = xp.arange(self.A.shape[0]) else: self.cs = cs def _project(self, x: npt.NDArray) -> np.ndarray: """ Projects the input array `x` onto the feasible region defined by the constraints. Parameters ---------- x : npt.NDArray The input array to be projected. Returns ------- npt.NDArray The projected array. """ for i in self.cs: p_i = self.single_map(x, i) res = self.b[i] - p_i if res < 0: self.A.update_step( x, self.algorithmic_relaxation * self.inverse_row_norm[i] * res, i ) return x
[docs] class SequentialWeightedAMSHalfspace(SequentialAMSHalfspace): """ Parameters ---------- A : npt.NDArray or sparse.sparray Matrix for linear systems b : npt.NDArray Bound for linear systems weights : None, list of float, or npt.NDArray, optional The weights assigned to each constraint. If None, default weights are used. algorithmic_relaxation : npt.NDArray or float, optional The relaxation parameter used by the algorithm, by default 1.0. relaxation : float, optional Outer relaxation parameter, applied to the entire solution of the iterate by default 1.0. weight_decay : float, optional Parameter that determines the rate at which the weights are reduced after each phase (weights * weight_decay). Default is 1.0. cs : None or list of int, optional The indices of the constraints to be considered. Default is None. proximity_flag : bool, optional Flag to indicate if proximity should be considered. Default is True. Attributes ---------- weights : npt.NDArray The weights assigned to each constraint. weight_decay : float Decay rate for the weights. temp_weight_decay : float Initial value for weight decay. """ def __init__( self, A: npt.NDArray, b: npt.NDArray, weights: None | List[float] | npt.NDArray = None, algorithmic_relaxation: npt.NDArray | float = 1.0, relaxation: float = 1.0, weight_decay: float = 1.0, cs: None | List[int] = None, proximity_flag: bool = True, ): super().__init__(A, b, algorithmic_relaxation, relaxation, cs, proximity_flag) xp = cp if self._use_gpu else np self.weight_decay = weight_decay self.temp_weight_decay = 1.0 if weights is None: self.weights = xp.ones(self.A.shape[0]) else: self.weights = weights def _project(self, x: npt.NDArray) -> np.ndarray: """ Projects the input array `x` onto a feasible region defined by the constraints. Parameters ---------- x : npt.NDArray The input array to be projected. Returns ------- npt.NDArray The projected array. Notes ----- This method iteratively adjusts the input array `x` based on the constraints defined in `self.cs`. For each constraint, it computes the projection and checks if the constraints are violated. If a constraint is violated, it updates the array `x` using a weighted relaxation factor. The weight decay is applied to the temporary weight decay after each iteration. """ weighted_relaxation = self.algorithmic_relaxation * self.temp_weight_decay for i in self.cs: p_i = self.single_map(x, i) res = self.b[i] - p_i if res < 0: self.A.update_step( x, weighted_relaxation * self.weights[i] * self.inverse_row_norm[i] * res, i ) self.temp_weight_decay *= self.weight_decay return x
[docs] class SimultaneousAMSHalfspace(HalfspaceAlgorithm): """ SimultaneousAMS is an implementation of the AMS (Alternating Minimization Scheme) algorithm that performs simultaneous projections and proximity calculations. Parameters ---------- A : npt.NDArray or sparse.sparray Matrix for linear systems b : npt.NDArray Bound for linear systems algorithmic_relaxation : npt.NDArray or float, optional The relaxation parameter used by the algorithm, by default 1.0. relaxation : float, optional Outer relaxation parameter, applied to the entire solution of the iterate by default 1.0. weights : None or List[float], optional The weights for the constraints, by default None. proximity_flag : bool, optional Flag to indicate if proximity calculations should be performed, by default True. """ def __init__( self, A: npt.NDArray, b: npt.NDArray, algorithmic_relaxation: npt.NDArray | float = 1.0, relaxation: float = 1.0, weights: None | List[float] = None, proximity_flag: bool = True, ): super().__init__(A, b, algorithmic_relaxation, relaxation, proximity_flag) xp = cp if self._use_gpu else np if weights is None: self.weights = xp.ones(self.A.shape[0]) / self.A.shape[0] elif xp.abs((weights.sum() - 1)) > 1e-10: warnings.warn("Weights do not add up to 1! Renormalizing to 1...", stacklevel=2) self.weights = weights / weights.sum() else: self.weights = weights def _project(self, x: npt.NDArray) -> np.ndarray: # simultaneous projection p = self.map(x) res = self.b - p res_idx = res < 0 x += self.algorithmic_relaxation * ( self.weights[res_idx] * self.inverse_row_norm[res_idx] * res[res_idx] @ self.A[res_idx, :] ) return x def _proximity(self, x: npt.NDArray, proximity_measures: List) -> list[float]: p = self.map(x) res = self.b - p res[res > 0] = 0 res = -res measures = [] for measure in proximity_measures: if isinstance(measure, tuple): if measure[0] == "p_norm": measures.append(self.weights @ (res ** measure[1])) else: raise ValueError("Invalid proximity measure") elif isinstance(measure, str) and measure == "max_norm": measures.append(res.max()) else: raise ValueError("Invalid proximity measure") return measures
[docs] class BlockIterativeAMSHalfspace(HalfspaceAlgorithm): """ Block Iterative AMS Algorithm. This class implements a block iterative version of the AMS (Alternating Minimization Scheme) algorithm. It is designed to handle constraints and weights in a block-wise manner. Parameters ---------- A : npt.NDArray or sparse.sparray Matrix for linear systems b : npt.NDArray Bound for linear systems weights : List[List[float]] or List[npt.NDArray] A list of lists or arrays representing the weights for each block. Each list/array should sum to 1. algorithmic_relaxation : npt.NDArray or float, optional The relaxation parameter used by the algorithm, by default 1.0. relaxation : float, optional Outer relaxation parameter, applied to the entire solution of the iterate by default 1.0. proximity_flag : bool, optional A flag indicating whether to use proximity measures, by default True. Raises ------ ValueError If any of the weight lists do not sum to 1. """ def __init__( self, A: npt.NDArray, b: npt.NDArray, weights: List[List[float]] | List[npt.NDArray], algorithmic_relaxation: npt.NDArray | float = 1.0, relaxation: float = 1.0, proximity_flag: bool = True, ): super().__init__(A, b, algorithmic_relaxation, relaxation, proximity_flag) xp = cp if self._use_gpu else np # check that weights is a list of lists that add up to 1 each for el in weights: if xp.abs((xp.sum(el) - 1)) > 1e-10: raise ValueError("Weights do not add up to 1!") self.weights = [] self.block_idxs = [ xp.where(xp.array(el) > 0)[0] for el in weights ] # get idxs that meet requirements # assemble a list of general weights self.total_weights = xp.zeros_like(weights[0]) for el in weights: el = xp.asarray(el) self.weights.append(el[xp.array(el) > 0]) # remove non zero weights self.total_weights += el / len(weights) def _project(self, x: npt.NDArray) -> np.ndarray: # simultaneous projection for el, block_idx in zip(self.weights, self.block_idxs): p = self.indexed_map(x, block_idx) res = self.b[block_idx] - p res_idx = res < 0 full_idx = block_idx[res_idx] x += self.algorithmic_relaxation * ( el[res_idx] * self.inverse_row_norm[full_idx] * res[res_idx] @ self.A[full_idx, :] ) return x def _proximity(self, x: npt.NDArray, proximity_measures: List) -> list[float]: p = self.map(x) res = self.b - p res[res > 0] = 0 res = -res measures = [] for measure in proximity_measures: if isinstance(measure, tuple): if measure[0] == "p_norm": measures.append(self.total_weights @ (res ** measure[1])) else: raise ValueError("Invalid proximity measure") elif isinstance(measure, str) and measure == "max_norm": measures.append(res.max()) else: raise ValueError("Invalid proximity measure") return measures
[docs] class StringAveragedAMSHalfspace(HalfspaceAlgorithm): """ StringAveragedAMS is an implementation of the HalfspaceAlgorithm that performs string averaged projections. Parameters ---------- A : npt.NDArray or sparse.sparray Matrix for linear systems b : npt.NDArray Bound for linear systems strings : List[List[int]] A list of lists, where each inner list represents a string of indices. algorithmic_relaxation : npt.NDArray or float, optional The relaxation parameter used by the algorithm, by default 1.0. relaxation : float, optional Outer relaxation parameter, applied to the entire solution of the iterate by default 1.0. weights : None or List[float], optional The weights for each string, by default None. If None, equal weights are assigned. proximity_flag : bool, optional A flag indicating whether to use proximity, by default True. """ def __init__( self, A: npt.NDArray, b: npt.NDArray, strings: List[List[int]], algorithmic_relaxation: npt.NDArray | float = 1.0, relaxation: float = 1.0, weights: None | List[float] = None, proximity_flag: bool = True, ): super().__init__(A, b, algorithmic_relaxation, relaxation, proximity_flag) xp = cp if self._use_gpu else np self.strings = strings if weights is None: self.weights = xp.ones(len(strings)) / len(strings) else: if len(weights) != len(self.strings): raise ValueError("The number of weights must be equal to the number of strings.") self.weights = weights def _project(self, x): # string averaged projection x_c = x.copy() # create a general copy of x x -= x # reset x is this viable? for string, weight in zip(self.strings, self.weights): x_s = x_c.copy() for i in string: p_i = self.single_map(x_s, i) res_i = self.b[i] - p_i if res_i < 0: self.A.update_step( x_s, self.algorithmic_relaxation * self.inverse_row_norm[i] * res_i, i ) x += weight * x_s return x