Source code for suppy.feasibility._split_algorithms

"""Algorithms for split feasibility problem."""
import warnings
from abc import ABC, abstractmethod
from typing import List, Callable
import numpy as np
import numpy.typing as npt
from scipy import sparse

try:
    import cupy as cp

    NO_GPU = False
except ImportError:
    cp = np
    NO_GPU = True

from suppy.utils import LinearMapping
from suppy.utils import ensure_float_array
from suppy.projections._projections import Projection

from suppy.feasibility._linear_algorithms import Feasibility


[docs] class SplitFeasibility(Feasibility, ABC): """ Abstract base class used to represent split feasibility problems. Parameters ---------- A : npt.NDArray Matrix connecting input and target space. algorithmic_relaxation : npt.NDArray or float, optional The relaxation parameter used by the algorithm, by default 1.0. proximity_flag : bool, optional A flag indicating whether to use this object for proximity calculations, by default True. Attributes ---------- A : LinearMapping Linear mapping between input and target space. proximities : list A list to store proximity values during the solve process. algorithmic_relaxation : npt.NDArray or float, optional The relaxation parameter used by the algorithm, by default 1.0. proximity_flag : bool, optional A flag indicating whether to use this object for proximity calculations. relaxation : float, optional The relaxation parameter for the projection, by default 1.0. """ def __init__( self, A: npt.NDArray | sparse.sparray, algorithmic_relaxation: npt.NDArray | float = 1.0, proximity_flag: bool = True, ): _, _use_gpu = LinearMapping.get_flags(A) super().__init__(algorithmic_relaxation, 1, proximity_flag, _use_gpu=_use_gpu) self.A = LinearMapping(A) self.proximities = [] self.all_x = None
[docs] @ensure_float_array def solve( self, x: npt.NDArray, max_iter: int = 10, prox_tol: float = 1e-6, del_prox_tol: float = 1e-8, del_prox_n: int = 5, storage: bool = False, storage_iters: List[int] | int | None = None, proximity_measures: List | None = None, alternative_stopping_criterion: Callable | None = None, alternative_stopping_criterion_initial_call: Callable | None = None, ) -> np.ndarray: """ Solves the split feasibility problem for a given input array. Parameters ---------- x : npt.NDArray Starting point for the algorithm. max_iter : int, optional The maximum number of iterations (default is 10). prox_tol : float, optional Stopping criterium for the feasibility seeking algorithm. Solution deemed feasible if the proximity drops below this value (default is 1e-6). del_prox_tol : float, optional The tolerance for the change in proximity over the last del_prox_n iterations, by default 1e-8. del_prox_n : int, optional The number of iterations to check for the change in proximity, by default 5. storage : bool, optional A flag indicating whether to store all intermediate solutions (default is False). storage_iters : List[int] or int, optional Controls which iterations are stored (when storage=True). If None, all iterations are stored. If a list of ints, only those iteration indices are stored (0 = initial point). If an int, storage occurs every that many iterations. proximity_measures : List, optional The proximity measures to calculate, by default None. Right now only the first in the list is used to check the feasibility. alternative_stopping_criterion : callable, optional Alternative stopping criterion. alternative_stopping_criterion_initial_call : callable, optional Initial call for an alternative stopping criterion. Returns ------- npt.NDArray The solution after applying the feasibility seeking algorithm. """ xp = cp if isinstance(x, cp.ndarray) else np self._n_tol = 0 if proximity_measures is None: proximity_measures = [("p_norm", 2)] else: # TODO: Check if the proximity measures are valid _ = None self.proximities = [self.proximity(x, proximity_measures)] i = 0 def _should_store(idx): if storage_iters is None: return True if isinstance(storage_iters, int): return idx % storage_iters == 0 return idx in storage_iters if storage is True: self.all_x = [] if _should_store(0): if isinstance(x, cp.ndarray) and not NO_GPU: self.all_x.append((x.get())) else: self.all_x.append(np.array(x.copy())) if alternative_stopping_criterion_initial_call is not None: stop = alternative_stopping_criterion_initial_call(x, self) else: stop = False while i < max_iter and not stop: x, _ = self.step(x) if storage is True and _should_store(i + 1): if isinstance(x, np.ndarray): # convert to np array if cp self.all_x.append(np.array(x.copy())) else: self.all_x.append((x.get())) self.proximities.append(self.proximity(x, proximity_measures)) if alternative_stopping_criterion is not None: stop = alternative_stopping_criterion(x, self) else: stop = self._stopping_criterion(prox_tol, del_prox_tol, del_prox_n) i += 1 if self.all_x is not None: self.all_x = np.array(self.all_x) self.proximities = xp.array(self.proximities) return x
def _stopping_criterion(self, prox_tol: float, del_prox_tol: float, del_prox_n: int) -> bool: """Returns True when convergence is detected, False otherwise.""" if self.proximities[-1][0] < prox_tol: return True else: # check that last n proximity changes are below a threshold if self.proximities[-2][0] - self.proximities[-1][0] < del_prox_tol: self._n_tol += 1 else: self._n_tol = 0 if self._n_tol >= del_prox_n: return True return False
[docs] def project(self, x: npt.NDArray, y: npt.NDArray | None = None) -> tuple: """ Projects the input array onto the feasible set. Parameters ---------- x : npt.NDArray The input array to project. y : npt.NDArray, optional An optional array for projection (default is None). Returns ------- tuple A (x, y) pair of projected arrays; y may be None. """ return self._project(x, y)
[docs] def step(self, x: npt.NDArray, y: npt.NDArray | None = None) -> tuple: return self.project(x, y)
@abstractmethod def _project(self, x: npt.NDArray, y: npt.NDArray | None = None) -> tuple: pass
[docs] def map(self, x: npt.NDArray) -> np.ndarray: """ Maps the input space array to the target space via matrix multiplication. Parameters ---------- x : npt.NDArray The input space array to be mapped. Returns ------- npt.NDArray The corresponding target space array. """ return self.A @ x
[docs] def map_back(self, y: npt.NDArray) -> np.ndarray: """ Transposed map of the target space array to the input space. Parameters ---------- y : npt.NDArray The target space array to map. Returns ------- npt.NDArray The corresponding array in input space. """ return self.A.T @ y
[docs] class CQAlgorithm(SplitFeasibility): """ Implementation for the CQ algorithm to solve split feasibility problems. Parameters ---------- A : npt.NDArray Matrix connecting input and target space. C_projection : Projection The projection operator onto the set C. Q_projection : Projection The projection operator onto the set Q. algorithmic_relaxation : npt.NDArray or float, optional The relaxation parameter used by the algorithm, by default 1.0. proximity_flag : bool, optional A flag indicating whether to use this object for proximity calculations, by default True. Attributes ---------- A : LinearMapping Linear mapping between input and target space. C_projection : Projection The projection operator onto the set C. Q_projection : Projection The projection operator onto the set Q. proximities : list A list to store proximity values during the solve process. algorithmic_relaxation : npt.NDArray or float, optional The relaxation parameter used by the algorithm, by default 1.0. proximity_flag : bool A flag indicating whether to use this object for proximity calculations. """ def __init__( self, A: npt.NDArray | sparse.sparray, C_projection: Projection, Q_projection: Projection, algorithmic_relaxation: float = 1.0, proximity_flag: bool = True, ): super().__init__(A, algorithmic_relaxation, proximity_flag) self.c_projection = C_projection self.q_projection = Q_projection def _project(self, x: npt.NDArray, y: npt.NDArray | None = None) -> tuple: """ Perform one step of the CQ algorithm. Parameters ---------- x : npt.NDArray The point in the input space to be projected. y : npt.NDArray or None, optional The point in the target space to be projected, obtained through e.g. a perturbation step. If None, it is calculated from x. Returns ------- tuple A (x, None) pair with the updated input-space point. """ if y is None: y = self.map(x) y_p = self.q_projection.project(y.copy()) x = x - self.algorithmic_relaxation * self.map_back(y - y_p) return self.c_projection.project(x), None def _proximity(self, x: npt.NDArray, proximity_measures: List) -> list[float]: return [ self.c_projection.proximity(x, proximity_measures), self.q_projection.proximity(self.map(x), proximity_measures), ] def _stopping_criterion(self, prox_tol: float, del_prox_tol: float, del_prox_n: int) -> bool: """Returns True when convergence is detected, False otherwise.""" if self.proximities[-1][1][0] < prox_tol: return True else: # check that last n proximity changes of are below a threshold if self.proximities[-2][1][0] - self.proximities[-1][1][0] < del_prox_tol: self._n_tol += 1 else: self._n_tol = 0 if self._n_tol >= del_prox_n: return True return False
class ProductSpaceAlgorithm(SplitFeasibility): """ Implementation for a product space algorithm to solve split feasibility problems. Parameters ---------- A : npt.NDArray Matrix connecting input and target space. C_projections : List[Projection] The projection operators onto the sets C. Q_projections : List[Projection] The projection operators onto the sets Q. algorithmic_relaxation : npt.NDArray or float, optional The relaxation parameter used by the algorithm, by default 1.0. proximity_flag : bool, optional A flag indicating whether to use this object for proximity calculations, by default True. Attributes ---------- Pv : npt.NDArray Projection matrix onto the constraint manifold. xs : list Input-space iterates accumulated during solve. ys : list Target-space iterates accumulated during solve. """ def __init__( self, A: npt.NDArray | sparse.sparray, C_projections: List[Projection], Q_projections: List[Projection], algorithmic_relaxation: npt.NDArray | float = 1.0, proximity_flag: bool = True, ): super().__init__(A, algorithmic_relaxation, proximity_flag) self.c_projections = C_projections self.q_projections = Q_projections # calculate projection back into Ax=b space Z = np.concatenate([A, -1 * np.eye(A.shape[0])], axis=1) self.Pv = np.eye(Z.shape[1]) - LinearMapping(Z.T @ (np.linalg.inv(Z @ Z.T)) @ Z) warnings.warn( "ProductSpaceAlgorithm is only suitable for small scale problems. " "Use CQAlgorithm for larger problems.", stacklevel=2, ) self.xs = [] self.ys = [] def _project(self, x: npt.NDArray, y: npt.NDArray | None = None) -> tuple: """ Perform one step of the product space algorithm. Parameters ---------- x : npt.NDArray The point in the input space to be projected. y : npt.NDArray or None, optional The point in the target space to be projected, obtained through e.g. a perturbation step. If None, it is calculated from x. Returns ------- tuple A (x, None) pair with the updated input-space point. """ if y is None: y = self.map(x) for el in self.c_projections: x = el.project(x) for el in self.q_projections: y = el.project(y) xy = self.Pv @ np.concatenate([x, y]) self.xs.append(xy[: len(x)].copy()) self.ys.append(xy[len(x) :].copy()) return xy[: len(x)], None def _proximity(self, x: npt.NDArray, _proximity_measures: List) -> list[float]: raise NotImplementedError("Proximity not implemented for ProductSpaceAlgorithm.")