from typing import List, Callable
import numpy as np
import numpy.typing as npt
from suppy.feasibility._bands._ams_extrapolations import AdaptiveStepLandweberHyperslab
from suppy.utils import ensure_float_array
from suppy.perturbations import Perturbation, DummyPerturbation
from ._sup import FeasibilityPerturbation
try:
import cupy as cp
NO_GPU = False
except ImportError:
cp = np
NO_GPU = True
[docs]
class SplitSuperiorization(FeasibilityPerturbation):
"""
A class used to perform split superiorization on a given feasibility
problem.
Parameters
----------
basic : object
An instance of a split problem.
input_perturbation_scheme : Perturbation or None, optional
Perturbation scheme for the input, by default None.
target_perturbation_scheme : Perturbation or None, optional
Perturbation scheme for the target, by default None.
input_objective_tol : float, optional
Tolerance for the input objective function, by default 1e-4.
target_objective_tol : float, optional
Tolerance for the target objective function, by default 1e-4.
prox_tol : float, optional
Tolerance for the constraint, by default 1e-6.
Attributes
----------
input_perturbation_scheme : Perturbation or None
Perturbation scheme for the input.
target_perturbation_scheme : Perturbation or None
Perturbation scheme for the target.
input_objective_tol : float
Tolerance for the input objective function.
target_objective_tol : float
Tolerance for the target objective function.
prox_tol : float
Tolerance for the constraint.
input_f_k : float
The current objective function value for the input.
target_f_k : float
The current objective function value for the target.
p_k : float
The current proximity function value.
_k : int
The current iteration number.
all_x_values : list
Array storing all points achieved via the superiorization algorithm.
all_function_values : list
Array storing all objective function values achieved via the superiorization algorithm.
all_x_values_function_reduction : list
Array storing all points achieved via the function reduction step.
all_function_values_function_reduction : list
Array storing all objective function values achieved via the function reduction step.
"""
def __init__(
self,
basic, # needs to be a split problem
input_perturbation_scheme: Perturbation | None = None,
target_perturbation_scheme: Perturbation | None = None,
):
super().__init__(basic)
if input_perturbation_scheme is None and target_perturbation_scheme is None:
raise ValueError(
"At least one perturbation scheme must be provided for SplitSuperiorization."
)
self.input_perturbation_scheme = (
input_perturbation_scheme
if input_perturbation_scheme is not None
else DummyPerturbation()
)
self.target_perturbation_scheme = (
target_perturbation_scheme
if target_perturbation_scheme is not None
else DummyPerturbation()
)
# initialize some variables for the algorithms
self.input_f_k = None
self.target_f_k = None
self.p_k = None
self._k = 0
self.all_x = []
self.all_function_values = []
self.proximities = []
self.all_x_function_reduction = []
self.all_function_values_function_reduction = []
self.proximities_function_reduction = []
self.all_x_basic = []
self.all_function_values_basic = []
self.proximities_basic = []
[docs]
@ensure_float_array
def solve(
self,
x: npt.NDArray,
max_iter: int = 10,
prox_tol: float = 1e-6,
del_prox_tol: float = 1e-8,
del_prox_n: int = 5,
proximity_measures: List | None = None,
del_input_objective_tol: float = 1e-6,
del_input_objective_n: int = 5,
del_target_objective_tol: float = 1e-6,
del_target_objective_n: int = 5,
storage: bool = False,
storage_iters: List[int] | int | None = None,
alternative_stopping_criterion: Callable | None = None,
alternative_stopping_criterion_initial_call: Callable | None = None,
) -> np.ndarray:
"""
Solves the optimization problem using the superiorization method.
Parameters
----------
x : npt.NDArray
Starting point for the algorithm.
max_iter : int, optional
Maximum number of iterations to perform, by default 500.
prox_tol : float, optional
The tolerance for the proximity on the constraints, by default 1e-6.
del_prox_tol : float, optional
The tolerance for the change in proximity over the last del_prox_n iterations, by default 1e-8.
del_prox_n : int, optional
The number of iterations that del_prox_tol needs to be met in a row, by default 5.
proximity_measures : List, optional
The proximity measures to calculate, by default a l2 norm measure is used. Right now only the first in the list is used to check the feasibility.
del_input_objective_tol
The tolerance for change in the objective function over the last del_input_objective_n iterations, by default 1e-8.
del_input_objective_n
The number of iterations that del_input_objective_tol needs to be met in a row, by default 5.
del_target_objective_tol
The tolerance for change in the objective function over the last del_target_objective_n iterations, by default 1e-8.
del_target_objective_n
The number of iterations that del_target_objective_tol needs to be met in a row, by default 5.
storage : bool, optional
Flag indicating whether to store intermediate solutions, by default False.
storage_iters : List[int] or int, optional
Controls which iterations are stored (when storage=True). If None, all iterations are stored.
If a list of ints, only those iteration indices are stored (0 = initial point).
If an int, storage occurs every that many iterations.
alternative_stopping_criterion : callable, optional
Alternative stopping criterion
alternative_stopping_criterion_initial_call : callable, optional
Initial call for an alternative stopping criterion
Returns
-------
npt.NDArray
The superiorized solution after performing the superiorization method.
"""
def _should_store(idx):
if storage_iters is None:
return True
if isinstance(storage_iters, int):
return idx % storage_iters == 0
return idx in storage_iters
# initialization of variables
if proximity_measures is None:
proximity_measures = [("p_norm", 2)]
else:
# TODO: check that proximity measures are valid
_ = None
self.input_perturbation_scheme.reset() # reset the input perturbation scheme
self.target_perturbation_scheme.reset() # reset the target perturbation scheme
self._n_tol_input_objective = (
0 # number of iterations with objective function changes below threshold
)
self._n_tol_target_objective = (
0 # number of iterations with objective function changes below threshold
)
self._n_tol_prox = 0 # number of iterations with proximity changes below threshold
self.t = [0] # array storing the time for each iteration
self.l = []
x_0 = x.copy()
self._initial_storage(
x,
storage and _should_store(0),
[
self.input_perturbation_scheme.func(x_0),
self.target_perturbation_scheme.func(self.basic.map(x_0)),
],
self.basic.proximity(x_0, proximity_measures),
)
self._k = 0 # reset counter if necessary
if alternative_stopping_criterion_initial_call is not None:
stop = alternative_stopping_criterion_initial_call(x, self)
else:
stop = False # criterion for stopping the algorithm
# initial function and proximity values
# self.input_f_k = self.input_perturbation_scheme.func(x_0)
# self.target_f_k = self.target_perturbation_scheme.func(y)
# self.p_k = self.basic.proximity(x_0, proximity_measures)
# # if storage:
# # self._initial_storage(x_0,self.perturbation_scheme.func(x_0))
y = None
while self._k < max_iter and not stop:
# check if a restart should be performed
# perform the perturbation schemes update steps and pre steps
self.input_perturbation_scheme.pre_step(
x,
last_proximity=self.proximities[-1][0],
last_proximity_basic=self.proximities_basic[-1][0],
last_proximity_function_reduction=self.proximities_function_reduction[-1][0],
last_function_value=self.all_function_values[-1][0],
last_function_value_basic=self.all_function_values_basic[-1][0],
)
x = self.input_perturbation_scheme.perturbation_step(x)
self.target_perturbation_scheme.pre_step(
y,
last_proximity=self.proximities[-1][1],
last_proximity_basic=self.proximities_basic[-1][1],
last_proximity_function_reduction=self.proximities_function_reduction[-1][1],
last_function_value=self.all_function_values[-1][1],
last_function_value_basic=self.all_function_values_basic[-1][1],
)
y = self.target_perturbation_scheme.perturbation_step(y)
# post steps
self.input_perturbation_scheme.post_step(
x,
last_proximity=self.proximities[-1][0],
last_proximity_basic=self.proximities_basic[-1][0],
last_proximity_function_reduction=self.proximities_function_reduction[-1][0],
last_function_value=self.all_function_values[-1][0],
last_function_value_basic=self.all_function_values_basic[-1][0],
)
self.target_perturbation_scheme.post_step(
y,
last_proximity=self.proximities[-1][1],
last_proximity_basic=self.proximities_basic[-1][1],
last_proximity_function_reduction=self.proximities_function_reduction[-1][1],
last_function_value=self.all_function_values[-1][1],
last_function_value_basic=self.all_function_values_basic[-1][1],
)
self.storage(
x,
kind="function_reduction",
storage=storage and _should_store(self._k + 1),
f=[
self.input_perturbation_scheme.func(x),
self.target_perturbation_scheme.func(y),
],
p=self.basic.proximity(x, proximity_measures),
)
# perform basic step
x, y = self.basic.step(x, y)
self.storage(
x,
kind="basic",
storage=storage and _should_store(self._k + 1),
f=[
self.input_perturbation_scheme.func(x),
self.target_perturbation_scheme.func(y),
],
p=self.basic.proximity(x, proximity_measures),
)
self._k += 1
# enable different stopping criteria for different superiorization algorithms
if alternative_stopping_criterion is not None:
stop = alternative_stopping_criterion(x, self)
else:
stop = self._stopping_criterion(
del_input_objective_tol,
del_input_objective_n,
del_target_objective_tol,
del_target_objective_n,
prox_tol,
del_prox_tol,
del_prox_n,
)
self._additional_action(x, y)
self._post_step(x)
return x
def _stopping_criterion(
self,
del_input_objective_tol: float,
del_input_objective_n: int,
del_target_objective_tol: float,
del_target_objective_n: int,
prox_tol: float,
del_prox_tol: float,
del_prox_n: int,
) -> bool:
""""""
stop_objective = False # variable to check if the objective function criteria is met
stop_objective_input = (
abs(self.all_function_values[-3][0] - self.all_function_values[-1][0])
/ max(1, self.all_function_values[-3][0])
< del_input_objective_tol
)
stop_objective_target = (
abs(self.all_function_values[-3][1] - self.all_function_values[-1][1])
/ max(1, self.all_function_values[-3][1])
< del_target_objective_tol
)
if stop_objective_input:
self._n_tol_input_objective += 1
else:
self._n_tol_input_objective = 0
if stop_objective_target:
self._n_tol_target_objective += 1
else:
self._n_tol_target_objective = 0
if (self._n_tol_input_objective >= del_input_objective_n) and (
self._n_tol_target_objective >= del_target_objective_n
): # n objective function changes in input AND output space below threshold
stop_objective = True
stop_prox = False # variable to check if the proximity criteria is met
# check if proximity values are below the threshold
if self.proximities[-1][1][0] < prox_tol: # proximity below goal/tolerance
stop_prox = True
# check if the proximity changes are below tolerance level
if (
abs(self.proximities[-3][1][0] - self.proximities[-1][1][0])
/ max(1, self.proximities[-3][1][0])
< del_prox_tol
):
self._n_tol_prox += 1
else:
self._n_tol_prox = 0
if self._n_tol_prox >= del_prox_n: # n proximity changes below threshold
stop_prox = True
# check if both criteria are met
return stop_objective and stop_prox
def _additional_action(self, x: npt.NDArray, y: npt.NDArray):
"""
Perform an additional action on the given inputs.
Parameters
----------
x : np.ndarray
Description of parameter `x`.
y : np.ndarray
Description of parameter `y`.
Returns
-------
None
"""
def _initial_storage(self, x: npt.NDArray, storage: bool, f: list, p: list):
"""
Initializes storage for objective values and appends initial values.
Parameters
----------
x : np.ndarray
Initial values of the variables.
f : np.ndarray
Initial values of the objective function.
p : np.ndarray
Proximity function value
"""
# reset objective values
self.all_x = []
self.all_function_values = [] # array storing all objective function values
self.proximities = [] # array storing all proximity function values
self.all_x_function_reduction = []
self.all_function_values_function_reduction = []
self.proximities_function_reduction = []
self.all_x_basic = []
self.all_function_values_basic = []
self.proximities_basic = []
# append initial values
f_temp = []
for el in f:
if not NO_GPU and isinstance(el, cp.ndarray):
f_temp.append(el.get())
else:
f_temp.append(el)
# modify proximities
p_temp = []
for el in p:
if not NO_GPU and isinstance(el, cp.ndarray):
p_temp.append(el.get())
else:
p_temp.append(el)
self.all_function_values.append(f_temp)
self.all_function_values_basic.append(f_temp)
self.all_function_values_function_reduction.append(f_temp)
self.proximities.append(p_temp)
self.proximities_basic.append(p_temp)
self.proximities_function_reduction.append(p_temp)
if storage:
if isinstance(x, np.ndarray):
self.all_x.append(np.array(x.copy()))
self.all_x_basic.append(x.copy())
self.all_x_function_reduction.append(x.copy())
else:
self.all_x.append((x.get()))
self.all_x_basic.append((x.get()))
self.all_x_function_reduction.append((x.get()))
[docs]
def storage(
self,
x: npt.NDArray,
kind: str,
storage: bool = True,
f: list | None = None,
p: list | None = None,
):
"""
Stores the given values of x and f into the corresponding lists.
Parameters
----------
x : npt.NDArray
The current value of the variable x to be stored.
kind : str
The type of storage to be used, either "function_reduction" or "basic".
storage : bool, optional
If True, store the values of x
"""
# always store all function and proximity values
f_temp = []
for el in f:
if not NO_GPU and isinstance(el, cp.ndarray):
f_temp.append(el.get())
else:
f_temp.append(el)
self.all_function_values.append(f_temp)
# modify proximities
p_temp = []
for el in p:
if not NO_GPU and isinstance(el, cp.ndarray):
p_temp.append(el.get())
else:
p_temp.append(el)
self.proximities.append(p_temp)
if storage and isinstance(x, np.ndarray):
self.all_x.append(x.copy())
elif storage:
self.all_x.append((x.get()))
if kind == "function_reduction":
self.all_function_values_function_reduction.append(f_temp)
self.proximities_function_reduction.append(p_temp)
if storage and isinstance(x, np.ndarray):
self.all_x_function_reduction.append(x.copy())
elif storage:
self.all_x_function_reduction.append((x.get()))
elif kind == "basic":
self.all_function_values_basic.append(f_temp)
self.proximities_basic.append(p_temp)
if storage and isinstance(x, np.ndarray):
self.all_x_basic.append(x.copy())
elif storage:
self.all_x_basic.append((x.get()))
else:
raise ValueError("Invalid storage type. Use 'function_reduction' or 'basic'.")
def _post_step(self, x: npt.NDArray):
"""
Perform an action after the optimization process has finished.
Parameters
----------
x : array-like
The current value of the variable x.
Returns
-------
None
"""
self.all_x = np.array(self.all_x)
self.all_x_function_reduction = np.array(self.all_x_function_reduction)
self.all_x_basic = np.array(self.all_x_basic)
self.all_function_values = np.array(self.all_function_values)
self.all_function_values_function_reduction = np.array(
self.all_function_values_function_reduction
)
self.all_function_values_basic = np.array(self.all_function_values_basic)
self.proximities = np.array(self.proximities)
self.proximities_function_reduction = np.array(self.proximities_function_reduction)
self.proximities_basic = np.array(self.proximities_basic)