"""Base class for perturbations applied to feasibility seeking algorithms."""
from abc import ABC, abstractmethod
from typing import Callable, List
import numpy as np
import numpy.typing as npt
from suppy.utils import FuncWrapper
try:
import cupy as cp
NO_GPU = False
except ImportError:
NO_GPU = True
cp = np
[docs]
class Perturbation(ABC):
"""
Abstract base class for perturbations applied to feasibility seeking
algorithms.
"""
[docs]
@abstractmethod
def perturbation_step(self, x: npt.NDArray) -> np.ndarray:
"""
Perform a perturbation step.
Parameters
----------
x : npt.NDArray
The input array to be perturbed.
Returns
-------
npt.NDArray
The perturbed array.
"""
[docs]
class ObjectivePerturbation(Perturbation, ABC):
"""
Base class for perturbations performed by decreasing an objective
function.
Parameters
----------
func : Callable
The objective function to be perturbed.
func_args : List
The arguments to be passed to the objective function.
n_red : int, optional
The number of reduction steps to perform in one perturbation iteration (default is 1).
Attributes
----------
func : FuncWrapper
A wrapped version of the objective function with its arguments.
n_red : int
The number of reduction steps to perform.
_k : int
Keeps track of the number of performed perturbations.
"""
def __init__(self, func: Callable, func_args: List, n_red: int = 1):
self.func = FuncWrapper(func, func_args)
self.n_red = n_red
self._k = 0
[docs]
def perturbation_step(self, x: npt.NDArray) -> np.ndarray:
"""
Perform n_red perturbation steps on the input array.
Parameters
----------
x : npt.NDArray
The input array to be perturbed.
Returns
-------
npt.NDArray
The perturbed array after applying the reduction steps.
"""
self._k += 1
n = 0
while n < self.n_red:
x = self._function_reduction_step(x)
n += 1
return x
@abstractmethod
def _function_reduction_step(self, x: npt.NDArray) -> np.ndarray:
"""
Abstract method to perform that should implement the individual
function reduction steps on the input array.
Needs to be implemented by subclasses.
Parameters
----------
x : npt.NDArray
Input array on which the reduction step is to be performed.
Returns
-------
npt.NDArray
The array after the reduction step has been applied.
"""
[docs]
def pre_step(self, x: npt.NDArray, *args, **kwargs) -> None:
"""
If required perform any form of step previous to each
perturbation in each iteration.
This method is intended to be overridden by subclasses to implement
specific pre-step logic. By default, it does nothing.
Parameters
----------
x : npt.NDArray
Current iterate.
"""
[docs]
def post_step(self, x: npt.NDArray, *args, **kwargs) -> None:
"""
If required perform any form of step after each perturbation in each
iteration.
This method is intended to be overridden by subclasses to implement
specific post-step logic. By default, it does nothing.
Parameters
----------
x : npt.NDArray
Current iterate.
"""
[docs]
def reset(self) -> None:
"""Reset the perturbation to its initial state."""
self._k = 0
class DummyPerturbation(ObjectivePerturbation):
"""Dummy perturbation that does not change the input."""
def __init__(self):
def _dummy_func(x):
"""Always returns 0."""
return 0
func_args = []
self.func = FuncWrapper(_dummy_func, func_args)
self.n_red = 1
self._k = 0 # keeps track of the number of performed perturbations
def _function_reduction_step(self, x: npt.NDArray) -> np.ndarray:
return x
[docs]
class GradientPerturbation(ObjectivePerturbation, ABC):
"""
A class for perturbations performed by decreasing an objective function
using the gradient.
Parameters
----------
func : Callable
The objective function to be perturbed.
grad : Callable
The gradient function of the objective function.
func_args : List
The arguments to be passed to the objective function.
grad_args : List
The arguments to be passed to the gradient function.
n_red : int, optional
The reduction factor, by default 1.
Attributes
----------
func : FuncWrapper
A wrapped version of the objective function with its arguments.
grad : FuncWrapper
A wrapped gradient function with its arguments.
n_red : int
The number of reduction steps to perform.
_k : int
Keeps track of the number of performed perturbations.
"""
def __init__(
self,
func: Callable,
grad: Callable,
func_args: List,
grad_args: List,
n_red: int = 1,
):
super().__init__(func, func_args, n_red)
self.grad = FuncWrapper(grad, grad_args)
class AdaptiveStepGradientPerturbation(GradientPerturbation):
"""
Objective function perturbation using gradient descent with adaptive
step size reduction.
Parameters
----------
func : Callable
The function to be reduced.
grad : Callable
The gradient of the function to be reduced.
func_args : List, optional
Additional arguments to be passed to the function, by default [].
grad_args : List, optional
Additional arguments to be passed to the gradient function, by default [].
func_level : float, optional
Value above which to perform a gradient step using a subgradient projection like step.
max_func_level : float, optional
Upper bound for the func_level; once reached the update step is skipped, by default np.inf.
epsilon : float, optional
Default value to update func_level by.
noisy : bool, optional
Changes the behavior of the update step.
References
----------
- [1] https://doi.org/10.1007/s11075-021-01229-z
"""
def __init__(
self,
func: Callable,
grad: Callable,
func_args: List | None = None,
grad_args: List | None = None,
func_level: float = 0.5,
max_func_level: float = np.inf,
epsilon: float = 1e-6,
noisy: bool = False,
):
if func_args is None:
func_args = []
if grad_args is None:
grad_args = []
super().__init__(func, grad, func_args, grad_args, n_red=1)
self.func_level = func_level
self.max_func_level = max_func_level
self.epsilon = epsilon
self.update_direction = -1 if noisy else 1
self.func_levels = [func_level]
def _function_reduction_step(self, x: npt.NDArray) -> np.ndarray:
"""
Perform a function reduction step using gradient descent with a step
size based on a subgradient projection.
Parameters
----------
x : npt.NDArray
The current point in the algorithm.
Returns
-------
npt.NDArray
The updated point after performing the reduction step.
"""
grad_eval = self.grad(x)
func_eval = self.func(x)
if grad_eval @ grad_eval <= 0 or func_eval <= self.func_level:
# if gradient is zero (or negative) or the function value is below alpha, skip
return x
return x - (func_eval - self.func_level) / (grad_eval @ grad_eval) * grad_eval
def post_step(self, x: npt.NDArray, *args, **kwargs) -> None:
"""Update func_level after each step."""
last_proximity_function_reduction = kwargs.get("last_proximity_function_reduction")[0]
last_proximity_basic = kwargs.get("last_proximity_basic")[0]
if last_proximity_basic < 1e-10 or last_proximity_function_reduction < 1e-10:
desirability = 0
else:
desirability = (
np.sqrt(last_proximity_function_reduction) - np.sqrt(last_proximity_basic)
) / np.sqrt(last_proximity_basic)
self.func_level = self.func_level + max(
self.epsilon, self.update_direction * self.func_level * desirability
)
self.func_levels.append(self.func_level)
def reset(self) -> None:
"""Reset the perturbation to its initial state."""
super().reset()
self._l = -1
[docs]
class PowerSeriesGradientPerturbation(GradientPerturbation):
"""
Objective function perturbation using gradient descent with step size
reduction according to a power series.
Has the option to restart the power series after a certain number of
steps.
Parameters
----------
func : Callable
The function to be reduced.
grad : Callable
The gradient of the function to be reduced.
func_args : List, optional
Additional arguments to be passed to the function, by default [].
grad_args : List, optional
Additional arguments to be passed to the gradient function, by default [].
n_red : int, optional
The number of reductions, by default 1.
step_size : float, optional
The step size for the gradient descent, by default 0.5.
step_size_modifier : float, optional
Scaling factor for the step size power series, by default 1.0.
n_restart : int, optional
The number of steps after which to restart the power series, by default -1 (no restart).
disable_gradient_scaling : bool, optional
If true, skip the normalization of the gradient, by default False.
iterative_scaling : bool, optional
If true, the power series is scaled by the iteration k without checking whether this actually decreases the objective, by default False.
bypass_objective_decrease : bool, optional
If true, the check whether the objective function value actually decreases is bypassed, by default False.
"""
def __init__(
self,
func: Callable,
grad: Callable,
func_args: List | None = None,
grad_args: List | None = None,
n_red: int = 1,
step_size: float = 0.5,
step_size_modifier: float = 1.0,
n_restart: int = -1,
disable_gradient_scaling: bool = False,
iterative_scaling: bool = False,
bypass_objective_decrease: bool = False,
):
if func_args is None:
func_args = []
if grad_args is None:
grad_args = []
super().__init__(func, grad, func_args, grad_args, n_red)
self.step_size = step_size
self.step_size_modifier = step_size_modifier
self._l = -1
self.n_restart = np.inf if n_restart == -1 else n_restart
self.disable_gradient_scaling = disable_gradient_scaling
self.iterative_scaling = iterative_scaling
self.bypass_objective_decrease = bypass_objective_decrease
self._last_grad_norm = 1
def _function_reduction_step(self, x: npt.NDArray) -> np.ndarray:
"""
Perform a function reduction step using gradient descent.
Parameters
----------
x : npt.NDArray
The current point in the algorithm.
Returns
-------
npt.NDArray
The updated point after performing the reduction step.
"""
# quick check if the step size is already zero, if so skip the rest of the calculations
if self.step_size_modifier * self.step_size**self._l * self._last_grad_norm <= 1e-14:
return x
xp = cp if isinstance(x, cp.ndarray) else np
grad_eval = self.grad(x)
func_eval = self.func(x)
if grad_eval @ grad_eval <= 0:
return x
grad_norm = 1 if self.disable_gradient_scaling else xp.linalg.norm(grad_eval)
self._last_grad_norm = grad_norm
if not self.iterative_scaling:
while True:
self._l += 1
x_ln = (
x - self.step_size_modifier * self.step_size**self._l * grad_eval / grad_norm
)
if self.bypass_objective_decrease or self.func(x_ln) <= func_eval:
return x_ln
else:
return (
x - self.step_size_modifier * self.step_size ** (self._k) * grad_eval / grad_norm
)
[docs]
def pre_step(self, x: npt.NDArray, *args, **kwargs) -> None:
"""
Resets the power series after n steps.
Parameters
----------
x : npt.NDArray
Current iterate.
"""
if self._k <= 0:
return
if self._k % self.n_restart == 0:
self._l = self._k // self.n_restart
[docs]
def reset(self) -> None:
"""Reset the perturbation to its initial state."""
super().reset()
self._l = -1