"""Simple projection objects."""
from abc import ABC, abstractmethod
import math
from typing import List
import numpy as np
import numpy.typing as npt
import matplotlib.pyplot as plt
from matplotlib import patches
from suppy.projections._projections import BasicProjection
try:
import cupy as cp
NO_GPU = False
except ImportError:
NO_GPU = True
cp = np
# from suppy.utils.decorators import ensure_float_array
# Class for basic projections
[docs]
class BoxProjection(BasicProjection):
"""
BoxProjection class for projecting points onto a box defined by lower
and upper bounds.
Parameters
----------
lb : npt.NDArray
Lower bounds of the box.
ub : npt.NDArray
Upper bounds of the box.
idx : npt.NDArray or None
Subset of the input vector to apply the projection on.
relaxation : float, optional
Relaxation parameter for the projection, by default 1.
proximity_flag : bool
Flag to indicate whether to take this object into account when calculating proximity,
by default True.
Attributes
----------
lb : npt.NDArray
Lower bounds of the box.
ub : npt.NDArray
Upper bounds of the box.
relaxation : float
Relaxation parameter for the projection.
proximity_flag : bool
Flag to indicate whether to take this object into account when calculating proximity.
idx : npt.NDArray
Subset of the input vector to apply the projection on.
"""
def __init__(
self,
lb: npt.NDArray,
ub: npt.NDArray,
relaxation: float = 1,
idx: npt.NDArray | None = None,
proximity_flag=True,
use_gpu=False,
):
super().__init__(relaxation, idx, proximity_flag, use_gpu)
self.lb = lb
self.ub = ub
def _project(self, x: npt.NDArray) -> np.ndarray:
"""
Projects the input array `x` onto the bounds defined by `self.lb`
and `self.ub`.
Parameters
----------
x : npt.NDArray
Input array to be projected. Can be a NumPy array or a CuPy array.
Returns
-------
npt.NDArray
The projected array with values clipped to the specified bounds.
Notes
-----
This method modifies the input array `x` in place.
"""
xp = cp if isinstance(x, cp.ndarray) else np
x[self.idx] = xp.maximum(self.lb, xp.minimum(self.ub, x[self.idx]))
return x
def _proximity(self, x: npt.NDArray, proximity_measures: List) -> float:
res = abs(x[self.idx] - self._project(x.copy())[self.idx])
measures = []
for measure in proximity_measures:
if isinstance(measure, tuple):
if measure[0] == "p_norm":
measures.append(1 / len(res) * (res ** measure[1]).sum())
else:
raise ValueError("Invalid proximity measure")
elif isinstance(measure, str) and measure == "max_norm":
measures.append(res.max())
else:
raise ValueError("Invalid proximity measure")
return measures
[docs]
def visualize(self, ax: plt.Axes | None = None, color=None):
"""
Visualize the box if it is 2D on a given matplotlib Axes.
Parameters
----------
ax : plt.Axes, optional
The matplotlib Axes to plot on. If None, a new figure and axes are created.
color : str or None, optional
The color to fill the box with. If None, the box will be filled with the default color.
Raises
------
ValueError
If the box is not 2-dimensional.
"""
if len(self.lb) != 2:
raise ValueError("Visualization only possible for 2D boxes")
if ax is None:
_, ax = plt.subplots()
box = patches.Rectangle(
(self.lb[0], self.lb[1]),
self.ub[0] - self.lb[0],
self.ub[1] - self.lb[1],
linewidth=1,
edgecolor="black",
facecolor=color,
alpha=0.5,
)
ax.add_patch(box)
[docs]
def get_xy(self):
"""
Generate the coordinates for the edges of a box if it is 2D.
This method creates four edges of a 2D box defined by the lower bounds (lb) and upper bounds (ub).
The edges are generated using 100 points each.
Returns
-------
npt.NDArray
A 2D array of shape (2, 400) containing the concatenated coordinates of the four edges.
Raises
------
ValueError
If the box is not 2-dimensional.
"""
if len(self.lb) != 2:
raise ValueError("Visualization only possible for 2D boxes")
edge_1 = np.array([np.linspace(self.lb[0], self.ub[0], 100), np.ones(100) * self.lb[1]])
edge_2 = np.array([np.ones(100) * self.ub[0], np.linspace(self.lb[1], self.ub[1], 100)])
edge_3 = np.array([np.linspace(self.lb[0], self.ub[0], 100), np.ones(100) * self.ub[1]])
edge_4 = np.array([np.ones(100) * self.lb[0], np.linspace(self.lb[1], self.ub[1], 100)])
return np.concatenate((edge_1, edge_2, edge_3[:, ::-1], edge_4[:, ::-1]), axis=1)
[docs]
class WeightedBoxProjection(BasicProjection):
"""
WeightedBoxProjection applies a weighted projection on a box defined by
lower and upper bounds.
The idea is a "simultaneous" variant to the "sequential" BoxProjection.
Parameters
----------
lb : npt.NDArray
Lower bounds of the box.
ub : npt.NDArray
Upper bounds of the box.
weights : npt.NDArray
Weights for the projection.
relaxation : float, optional
Relaxation parameter, by default 1.
idx : npt.NDArray or None
Subset of the input vector to apply the projection on.
proximity_flag : bool, optional
Flag to indicate if proximity should be calculated, by default True.
use_gpu : bool, optional
Flag to indicate if GPU should be used, by default False.
Attributes
----------
lb : npt.NDArray
Lower bounds of the box.
ub : npt.NDArray
Upper bounds of the box.
relaxation : float
Relaxation parameter for the projection.
proximity_flag : bool
Flag to indicate whether to take this object into account when calculating proximity.
idx : npt.NDArray
Subset of the input vector to apply the projection on.
"""
def __init__(
self,
lb: npt.NDArray,
ub: npt.NDArray,
weights: npt.NDArray,
relaxation: float = 1,
idx: npt.NDArray | None = None,
proximity_flag=True,
use_gpu=False,
):
super().__init__(relaxation, idx, proximity_flag, use_gpu)
self.lb = lb
self.ub = ub
self.weights = weights / weights.sum()
def _project(self, x: npt.NDArray) -> np.ndarray:
"""
Projects the input array `x`.
Parameters
----------
x : npt.NDArray
The input array to be projected.
Returns
-------
npt.NDArray
The projected array.
Notes
-----
This method modifies the input array `x` in place.
"""
xp = cp if isinstance(x, cp.ndarray) else np
x[self.idx] += self.weights * (
xp.maximum(self.lb, xp.minimum(self.ub, x[self.idx])) - x[self.idx]
)
return x
def _full_project(self, x: npt.NDArray) -> np.ndarray:
"""
Projects the elements of the input array `x` within the specified
bounds.
Parameters
----------
x : npt.NDArray
Input array to be projected.
Returns
-------
npt.NDArray
The projected array with elements constrained within the bounds.
"""
xp = cp if isinstance(x, cp.ndarray) else np
x[self.idx] = xp.maximum(self.lb, xp.minimum(self.ub, x[self.idx]))
return x
def _proximity(self, x: npt.NDArray, proximity_measures: List) -> float:
res = abs(x[self.idx] - self._project(x.copy())[self.idx])
measures = []
for measure in proximity_measures:
if isinstance(measure, tuple):
if measure[0] == "p_norm":
measures.append(self.weights @ (res ** measure[1]))
else:
raise ValueError("Invalid proximity measure")
elif isinstance(measure, str) and measure == "max_norm":
measures.append(res.max())
else:
raise ValueError("Invalid proximity measure")
return measures
[docs]
def visualize(self, ax: plt.Axes | None = None, color=None):
"""
Visualize the box if it is 2D on a given matplotlib Axes.
Parameters
----------
ax : plt.Axes, optional
The matplotlib Axes to plot on. If None, a new figure and axes are created.
color : str or None, optional
The color to fill the box with. If None, the box will be filled with the default color.
Raises
------
ValueError
If the box is not 2-dimensional.
"""
if len(self.lb) != 2:
raise ValueError("Visualization only possible for 2D boxes")
if ax is None:
_, ax = plt.subplots()
box = patches.Rectangle(
(self.lb[0], self.lb[1]),
self.ub[0] - self.lb[0],
self.ub[1] - self.lb[1],
linewidth=1,
edgecolor="black",
facecolor=color,
alpha=0.5,
)
ax.add_patch(box)
[docs]
def get_xy(self):
"""
Generate the coordinates for the edges of a box if it is 2D.
This method creates four edges of a 2D box defined by the lower bounds (lb) and upper bounds (ub).
The edges are generated using 100 points each.
Returns
-------
np.ndarray
A 2D array of shape (2, 400) containing the concatenated coordinates of the four edges.
Raises
------
ValueError
If the box is not 2-dimensional.
"""
if len(self.lb) != 2:
raise ValueError("Visualization only possible for 2D boxes")
edge_1 = np.array([np.linspace(self.lb[0], self.ub[0], 100), np.ones(100) * self.lb[1]])
edge_2 = np.array([np.ones(100) * self.ub[0], np.linspace(self.lb[1], self.ub[1], 100)])
edge_3 = np.array([np.linspace(self.lb[0], self.ub[0], 100), np.ones(100) * self.ub[1]])
edge_4 = np.array([np.ones(100) * self.lb[0], np.linspace(self.lb[1], self.ub[1], 100)])
return np.concatenate((edge_1, edge_2, edge_3[:, ::-1], edge_4[:, ::-1]), axis=1)
# Projection onto a single halfspace
[docs]
class HalfspaceProjection(BasicProjection):
"""
A class used to represent a projection onto a halfspace.
Parameters
----------
a : npt.NDArray
The normal vector defining the halfspace.
b : float
The offset value defining the halfspace.
relaxation : float, optional
The relaxation parameter, by default 1.
idx : npt.NDArray or None
Subset of the input vector to apply the projection on.
proximity_flag : bool, optional
Flag to indicate whether to take this object into account when calculating proximity, by default True.
use_gpu : bool, optional
Flag to indicate if GPU should be used, by default False.
Attributes
----------
a : npt.NDArray
The normal vector defining the halfspace.
a_norm : npt.NDArray
The normalized normal vector.
b : float
The offset value defining the halfspace.
relaxation : float
The relaxation parameter for the projection.
proximity_flag : bool
Flag to indicate whether to take this object into account when calculating proximity.
idx : npt.NDArray
Subset of the input vector to apply the projection on.
"""
def __init__(
self,
a: npt.NDArray,
b: float,
relaxation: float = 1,
idx: npt.NDArray | None = None,
proximity_flag=True,
use_gpu=False,
):
super().__init__(relaxation, idx, proximity_flag, use_gpu)
self.a = a
self.a_norm = self.a / (self.a @ self.a)
self.b = b
def _linear_map(self, x):
return self.a @ x
def _project(self, x: npt.NDArray) -> np.ndarray:
"""
Projects the input array `x`.
Parameters
----------
x : npt.NDArray
The input array to be projected.
Returns
-------
npt.NDArray
The projected array.
Notes
-----
This method modifies the input array `x` in place.
"""
# TODO: dtype check!
y = self._linear_map(x[self.idx])
if y > self.b:
x[self.idx] -= (y - self.b) * self.a_norm
return x
[docs]
def get_xy(self, x: npt.NDArray | None = None):
"""
Generate x and y coordinates for visualization of 2D halfspaces.
Parameters
----------
x : npt.NDArray or None, optional
The x-coordinates for which to compute the corresponding y-coordinates.
If None, a default range of x values from -10 to 10 is used.
Returns
-------
np.ndarray
A 2D array where the first row contains the x-coordinates and the second row contains the corresponding y-coordinates.
Raises
------
ValueError
If the halfspace is not 2-dimensional.
"""
if len(self.a) != 2:
raise ValueError("Visualization only possible for 2D halfspaces")
if x is None:
x = np.linspace(-10, 10, 100)
if self.a[1] == 0:
y = np.array([np.ones(100) * self.b, np.linspace(-10, 10, 100)])
else:
y = (self.b - self.a[0] * x) / self.a[1]
return np.array([x, y])
[docs]
def visualize(
self,
ax: plt.Axes | None = None,
x: npt.NDArray | None = None,
y_fill: npt.NDArray | None = None,
color=None,
):
"""
Visualize the halfspace if it is 2D on a given matplotlib Axes.
Parameters
----------
ax : plt.Axes, optional
The matplotlib Axes to plot on. If None, a new figure and axes are created.
color : str or None, optional
The color to fill the box with. If None, the halfspace will be filled with the default color.
Raises
------
ValueError
If the halfspace is not 2-dimensional.
"""
if len(self.a) != 2:
raise ValueError("Visualization only possible for 2D halfspaces")
if ax is None:
_, ax = plt.subplots()
if x is None:
x = np.linspace(-10, 10, 100)
if self.a[1] == 0:
ax.axvline(x=self.b / self.a[0], label="Halfspace", color=color)
if np.sign(self.a[0]) == 1:
ax.fill_betweenx(
x,
ax.get_xlim()[0],
self.b,
color=color,
label="Halfspace",
alpha=0.5,
)
else:
ax.fill_betweenx(
x,
self.b,
ax.get_xlim()[1],
color=color,
label="Halfspace",
alpha=0.5,
)
else:
y = (self.b - self.a[0] * x) / self.a[1]
ax.plot(x, y, color="xkcd:black")
if y_fill is None:
y_fill = np.min(y) if self.a[1] > 0 else np.max(y)
ax.fill_between(x, y, y_fill, color=color, label="Halfspace", alpha=0.5)
[docs]
class BandProjection(BasicProjection):
"""
A class used to represent a projection onto a band.
Parameters
----------
a : npt.NDArray
The normal vector defining the halfspace.
lb : float
The lower bound of the band.
ub : float
The upper bound of the band.
idx : npt.NDArray or None
Subset of the input vector to apply the projection on.
relaxation : float, optional
The relaxation parameter, by default 1.
idx : npt.NDArray or None
Subset of the input vector to apply the projection on.
Attributes
----------
a : npt.NDArray
The normal vector defining the halfspace.
a_norm : npt.NDArray
The normalized normal vector.
lb : float
The lower bound of the band.
ub : float
The upper bound of the band.
relaxation : float
The relaxation parameter for the projection.
proximity_flag : bool
Flag to indicate whether to take this object into account when calculating proximity.
idx : npt.NDArray
Subset of the input vector to apply the projection on.
"""
def __init__(
self,
a: npt.NDArray,
lb: float,
ub: float,
relaxation: float = 1,
idx: npt.NDArray | None = None,
proximity_flag=True,
use_gpu=False,
):
super().__init__(relaxation, idx, proximity_flag, use_gpu)
self.a = a
self.a_norm = self.a / (self.a @ self.a)
self.lb = lb
self.ub = ub
def _project(self, x: npt.NDArray) -> np.ndarray:
"""
Projects the input array `x`.
Parameters
----------
x : npt.NDArray
The input array to be projected.
Returns
-------
npt.NDArray
The projected array.
Notes
-----
This method modifies the input array `x` in place.
"""
y = self.a @ x[self.idx]
if y > self.ub:
x[self.idx] -= (y - self.ub) * self.a_norm
elif y < self.lb:
x[self.idx] -= (y - self.lb) * self.a_norm
return x
[docs]
def get_xy(self, x: npt.NDArray | None = None):
"""
Calculate the x and y coordinates for the lower and upper bounds of
a 2D band.
Parameters
----------
x : npt.NDArray or None, optional
The x-coordinates at which to evaluate the bounds. If None, a default range
from -10 to 10 with 100 points is used.
Returns
-------
tuple of np.ndarray
A tuple containing two numpy arrays:
- The first array represents the x and y coordinates for the lower bound.
- The second array represents the x and y coordinates for the upper bound.
Raises
------
ValueError
If the band is not 2-dimensional.
"""
if len(self.a) != 2:
raise ValueError("Visualization only possible for 2D bands")
if x is None:
x = np.linspace(-10, 10, 100)
if self.a[1] == 0:
y_lb = np.array([np.ones(100) * self.lb, np.linspace(-10, 10, 100)])
y_ub = np.array([np.ones(100) * self.ub, np.linspace(-10, 10, 100)])
else:
y_lb = (self.lb - self.a[0] * x) / self.a[1]
y_ub = (self.ub - self.a[0] * x) / self.a[1]
return np.array([x, y_lb]), np.array([x, y_ub])
[docs]
def visualize(self, ax: plt.Axes | None = None, x: npt.NDArray | None = None, color=None):
"""
Visualize the band if it is 2D on a given matplotlib Axes.
Parameters
----------
ax : plt.Axes, optional
The matplotlib Axes to plot on. If None, a new figure and axes are created.
color : str or None, optional
The color to fill the box with. If None, the band will be filled with the default color.
Raises
------
ValueError
If the band is not 2-dimensional.
"""
if len(self.a) != 2:
raise ValueError("Visualization only possible for 2D bands")
if ax is None:
_, ax = plt.subplots()
if x is None:
x = np.linspace(-10, 10, 100)
if self.a[1] == 0:
ax.plot(np.ones(100) * self.lb, x, color="xkcd:black")
ax.plot(np.ones(100) * self.ub, x, color="xkcd:black")
# ax.axvline(x = self.b/self.a[0],label='Halfspace',color = color)
if np.sign(self.a[0]) == 1:
ax.fill_betweenx(x, self.lb, self.ub, color=color, label="Band", alpha=0.5)
else:
ax.fill_betweenx(x, self.lb, self.ub, color=color, label="Band", alpha=0.5)
else:
y_lb = (self.lb - self.a[0] * x) / self.a[1]
y_ub = (self.ub - self.a[0] * x) / self.a[1]
ax.plot(x, y_lb, color="xkcd:black")
ax.plot(x, y_ub, color="xkcd:black")
ax.fill_between(x, y_lb, y_ub, color=color, label="Band", alpha=0.5)
[docs]
class BallProjection(BasicProjection):
"""
A class used to represent a projection onto a ball.
Parameters
----------
center : npt.NDArray
The center of the ball.
radius : float
The radius of the ball.
relaxation : float, optional
The relaxation parameter (default is 1).
idx : npt.NDArray or None
Subset of the input vector to apply the projection on.
proximity_flag : bool, optional
Flag to indicate whether to take this object into account when calculating proximity, by default True.
use_gpu : bool, optional
Flag to indicate if GPU should be used, by default False.
Attributes
----------
center : npt.NDArray
The center of the ball.
radius : float
The radius of the ball.
relaxation : float
The relaxation parameter for the projection.
proximity_flag : bool
Flag to indicate whether to take this object into account when calculating proximity.
idx : npt.NDArray
Subset of the input vector to apply the projection on.
"""
def __init__(
self,
center: npt.NDArray,
radius: float,
relaxation: float = 1,
idx: npt.NDArray | None = None,
proximity_flag=True,
use_gpu=False,
):
super().__init__(relaxation, idx, proximity_flag, use_gpu)
self.center = center
self.radius = radius
def _project(self, x: npt.NDArray) -> np.ndarray:
"""
Projects the input array `x` onto the surface of the ball.
Parameters
----------
x : npt.NDArray
The input array to be projected.
Returns
-------
npt.NDArray
The projected array.
"""
xp = cp if isinstance(x, cp.ndarray) else np
if xp.linalg.norm(x[self.idx] - self.center) > self.radius:
x[self.idx] -= (x[self.idx] - self.center) * (
1 - self.radius / xp.linalg.norm(x[self.idx] - self.center)
)
return x
[docs]
def visualize(self, ax: plt.Axes | None = None, color=None, edgecolor=None):
"""
Visualize the halfspace if it is 2D on a given matplotlib Axes.
Parameters
----------
ax : plt.Axes, optional
The matplotlib Axes to plot on. If None, a new figure and axes are created.
color : str or None, optional
The color to fill the box with. If None, the halfspace will be filled with the default color.
Raises
------
ValueError
If the halfspace is not 2-dimensional.
"""
if len(self.center) != 2:
raise ValueError("Visualization only possible for 2D balls")
if ax is None:
_, ax = plt.subplots()
circle = plt.Circle(
(self.center[0], self.center[1]),
self.radius,
facecolor=color,
alpha=0.5,
edgecolor=edgecolor,
)
ax.add_artist(circle)
[docs]
def get_xy(self):
"""
Generate x and y coordinates for a 2D ball visualization.
Returns
-------
np.ndarray
A 2x50 array where the first row contains the x coordinates and the
second row contains the y coordinates of the points on the circumference
of the 2D ball.
Raises
------
ValueError
If the center does not have exactly 2 dimensions.
"""
if len(self.center) != 2:
raise ValueError("Visualization only possible for 2D balls")
theta = np.linspace(0, 2 * np.pi, 50)
x = self.center[0] + self.radius * np.cos(theta)
y = self.center[1] + self.radius * np.sin(theta)
return np.array([x, y])
[docs]
class MaxDVHProjection(BasicProjection):
"""
Class for max dose-volume histogram projections.
Parameters
----------
d_max : float
The maximum dose value.
max_percentage : float
The maximum percentage of elements allowed to exceed d_max.
idx : npt.NDArray or None
Subset of the input vector to apply the projection on.
Attributes
----------
d_max : float
The maximum dose value.
max_percentage : float
The maximum percentage of elements allowed to exceed d_max.
"""
def __init__(
self,
d_max: float,
max_percentage: float,
idx: npt.NDArray | None = None,
relaxation: float = 1.0,
proximity_flag=True,
use_gpu=False,
):
super().__init__(
relaxation=relaxation, idx=idx, proximity_flag=proximity_flag, _use_gpu=use_gpu
)
# max percentage of elements that are allowed to exceed d_max
self.max_percentage = max_percentage
self.d_max = d_max
if isinstance(self.idx, slice):
self._idx_indices = None
elif self.idx.dtype == bool:
raise ValueError("Boolean indexing is not supported for this projection.")
else:
if self._use_gpu:
self._idx_indices = cp.asarray(self.idx, dtype=cp.int32)
else:
self._idx_indices = self.idx
def _project(self, x: npt.NDArray) -> np.ndarray:
"""
Projects the input array `x` onto the DVH constraint.
Parameters
----------
x : npt.NDArray
The input array to be projected.
Returns
-------
npt.NDArray
The projected array.
"""
if isinstance(self.idx, slice):
return self._project_all(x)
return self._project_subset(x)
def _project_all(self, x: npt.NDArray) -> np.ndarray:
n = len(x)
am = math.floor(self.max_percentage * n)
l = (x > self.d_max).sum()
z = l - am
if z > 0:
x[x.argsort()[n - l : n - am]] = self.d_max
return x
def _project_subset(self, x: npt.NDArray) -> np.ndarray:
n = self.idx.sum() if self.idx.dtype == bool else len(self.idx)
am = math.floor(self.max_percentage * n)
l = (x[self.idx] > self.d_max).sum()
z = l - am # number of elements that need to be reduced
if z > 0:
x[self._idx_indices[x[self.idx].argsort()[n - l : n - am]]] = self.d_max
return x
# def _project(self, x: npt.NDArray) -> np.ndarray:
# """
# Projects the input array `x` onto the DVH constraint.
# Parameters
# ----------
# x : npt.NDArray
# The input array to be projected.
# Returns
# -------
# npt.NDArray
# The projected array.
# Notes
# -----
# - The method calculates the number of elements that should receive a dose lower than `d_max` based on `max_percentage`.
# - It then determines how many elements in the input array exceed `d_max`.
# - If the number of elements exceeding `d_max` is greater than the allowed maximum, it reduces the highest values to `d_max`.
# """
# # percentage of elements that should receive a dose lower than d_max
# n = len(x) if isinstance(self.idx, slice) else self.idx.sum()
# am = math.floor(self.max_percentage * n)
# # number of elements in structure with dose greater than d_max
# l = (x[self.idx] > self.d_max).sum()
# z = l - am # number of elements that need to be reduced
# if z > 0:
# x[x[self.idx].argsort()[n - l : n - am]] = self.d_max
# return x
def _proximity(self, x: npt.NDArray, proximity_measures: List) -> List[float]:
"""
Calculate the proximity of the given array to a specified maximum
percentage.
Parameters
----------
x : npt.NDArray
Input array to be evaluated.
Returns
-------
float
The proximity value as a percentage.
"""
# TODO: Find appropriate proximity measure
if isinstance(self.idx, slice):
return self._proximity_all(x, proximity_measures)
return self._proximity_subset(x, proximity_measures)
def _proximity_all(self, x: npt.NDArray, proximity_measures: List) -> List[float]:
n = len(x)
am = math.floor(self.max_percentage * n)
l = (x > self.d_max).sum()
z = l - am
if z > 0:
x_over = x[x.argsort()[n - l : n - am]] - self.d_max
else:
return [0 for measure in proximity_measures]
measures = []
for measure in proximity_measures:
if isinstance(measure, tuple):
if measure[0] == "p_norm":
measures.append((x_over ** measure[1]).sum() / len(x))
else:
raise ValueError("Invalid proximity measure")
elif isinstance(measure, str) and measure == "max_norm":
measures.append(x_over.max())
else:
raise ValueError("Invalid proximity measure")
return measures
def _proximity_subset(self, x: npt.NDArray, proximity_measures: List) -> List[float]:
n = self.idx.sum() if self.idx.dtype == bool else len(self.idx)
am = math.floor(self.max_percentage * n)
l = (x[self.idx] > self.d_max).sum()
z = l - am # number of elements that need to be reduced
if z > 0:
x_over = x[self._idx_indices[x[self.idx].argsort()[n - l : n - am]]] - self.d_max
else:
return [0 for measure in proximity_measures]
measures = []
for measure in proximity_measures:
if isinstance(measure, tuple):
if measure[0] == "p_norm":
measures.append((x_over ** measure[1]).sum() / n)
else:
raise ValueError("Invalid proximity measure")
elif isinstance(measure, str) and measure == "max_norm":
measures.append(x_over.max())
else:
raise ValueError("Invalid proximity measure")
return measures
[docs]
class MinDVHProjection(BasicProjection):
""""""
def __init__(
self,
d_min: float,
min_percentage: float,
idx: npt.NDArray | None = None,
relaxation: float = 1.0,
proximity_flag=True,
use_gpu=False,
):
super().__init__(
relaxation=relaxation, idx=idx, proximity_flag=proximity_flag, _use_gpu=use_gpu
)
# percentage of elements that need to have at least d_min
self.min_percentage = min_percentage
self.d_min = d_min
if isinstance(self.idx, slice):
self._idx_indices = None
elif self.idx.dtype == bool:
raise ValueError("Boolean indexing is not supported for this projection.")
else:
if self._use_gpu:
self._idx_indices = cp.asarray(self.idx, dtype=cp.int32)
else:
self._idx_indices = self.idx
def _project(self, x: npt.NDArray) -> np.ndarray:
"""
Projects the input array `x` onto the DVH constraint.
Parameters
----------
x : npt.NDArray
The input array to be projected.
Returns
-------
npt.NDArray
The projected array.
"""
if isinstance(self.idx, slice):
return self._project_all(x)
return self._project_subset(x)
def _project_all(self, x: npt.NDArray) -> np.ndarray:
n = len(x)
am = math.ceil(self.min_percentage * n)
l = (x < self.d_min).sum()
z = l - n + am
if z > 0:
x[x.argsort()[n - am : l]] = self.d_min
return x
def _project_subset(self, x: npt.NDArray) -> np.ndarray:
n = self.idx.sum() if self.idx.dtype == bool else len(self.idx)
am = math.ceil(self.min_percentage * n)
l = (x[self.idx] < self.d_min).sum()
z = l - n + am # number of elements that need to be reduced
if z > 0:
x[self._idx_indices[x[self.idx].argsort()[n - am : l]]] = self.d_min
return x
def _proximity(self, x: npt.NDArray, proximity_measures: List) -> List[float]:
"""
Calculate the proximity of the given array to a specified maximum
percentage.
Parameters
----------
x : npt.NDArray
Input array to be evaluated.
Returns
-------
List[float]
List of proximity values.
"""
# TODO: Find appropriate proximity measure
if isinstance(self.idx, slice):
return self._proximity_all(x, proximity_measures)
return self._proximity_subset(x, proximity_measures)
def _proximity_all(self, x: npt.NDArray, proximity_measures: List) -> List[float]:
"""
Calculate the proximity of the given array to a specified maximum
percentage.
Parameters
----------
x : npt.NDArray
Input array to be evaluated.
Returns
-------
float
The proximity value as a percentage.
"""
# TODO: Find appropriate proximity measure
n = len(x)
am = math.ceil(self.min_percentage * n)
l = (x < self.d_min).sum()
z = l - n + am
if z > 0:
x_under = self.d_min - x[x.argsort()[n - am : l]]
else:
return [0 for measure in proximity_measures]
measures = []
for measure in proximity_measures:
if isinstance(measure, tuple):
if measure[0] == "p_norm":
measures.append((x_under ** measure[1]).sum() / n)
else:
raise ValueError("Invalid proximity measure")
elif isinstance(measure, str) and measure == "max_norm":
measures.append(x_under.max())
else:
raise ValueError("Invalid proximity measure")
return measures
def _proximity_subset(self, x: npt.NDArray, proximity_measures: List) -> List[float]:
"""
Calculate the proximity of the given array to a specified maximum
percentage.
Parameters
----------
x : npt.NDArray
Input array to be evaluated.
Returns
-------
float
The proximity value as a percentage.
"""
n = self.idx.sum() if self.idx.dtype == bool else len(self.idx)
am = math.ceil(self.min_percentage * n)
l = (x[self.idx] < self.d_min).sum()
z = l - n + am # number of elements that need to be reduced
if z > 0:
x_under = self.d_min - x[self._idx_indices[x[self.idx].argsort()[n - am : l]]]
else:
return [0 for measure in proximity_measures]
measures = []
for measure in proximity_measures:
if isinstance(measure, tuple):
if measure[0] == "p_norm":
measures.append((x_under ** measure[1]).sum() / n)
else:
raise ValueError("Invalid proximity measure")
elif isinstance(measure, str) and measure == "max_norm":
measures.append(x_under.max())
else:
raise ValueError("Invalid proximity measure")
return measures
class CustomProjection(BasicProjection):
"""
CustomProjection allows users to set up custom projection objects.
Parameters
----------
projection_function : callable
User-defined function for projection.
proximity_function : callable, optional
User-defined function for proximity calculation, by default None.
If None, the proximity is calculated based on P(x)-x residuals.
"""
def __init__(
self,
projection_function,
proximity_function=None,
relaxation=1,
idx: npt.NDArray | None = None,
proximity_flag=True,
_use_gpu=False,
):
super().__init__(relaxation, idx, proximity_flag, _use_gpu)
self.projection_function = projection_function
self.proximity_function = proximity_function
def _project(self, x: npt.NDArray) -> npt.NDArray:
return self.projection_function(x)
def _proximity(self, x: npt.NDArray, proximity_measures: List) -> List[float]:
if self.proximity_function is None:
return super()._proximity(x, proximity_measures)
else:
return self.proximity_function(x, proximity_measures)
class MMUProjection(BasicProjection, ABC):
"""
Class for enforcing minimum monitoring units. They can only be 0 or
above a certain threshold (mmu).
"""
# Default idea:
def __init__(
self,
mmu: float | npt.NDArray,
relaxation: float = 1.0,
idx: npt.NDArray | None = None,
proximity_flag=True,
_use_gpu=False,
):
super().__init__(
relaxation=relaxation, idx=idx, proximity_flag=proximity_flag, _use_gpu=_use_gpu
)
if isinstance(self.idx, slice):
self._idx_indices = None
elif self.idx.dtype == bool:
raise ValueError("Boolean indexing is not supported for this projection.")
else:
if self._use_gpu:
self._idx_indices = cp.asarray(self.idx, dtype=cp.int32)
else:
self._idx_indices = self.idx
self.mmu = mmu
@abstractmethod
def _project(self, x: npt.NDArray) -> npt.NDArray:
pass
class MMUProjection1(MMUProjection):
def _project(self, x: npt.NDArray) -> npt.NDArray:
"""
First idea:
If x[i] < mmu set x[i] to 0, else leave it unchanged.
"""
xp = cp if isinstance(x, cp.ndarray) else np
x[self.idx] = xp.where((x[self.idx] < self.mmu), 0, x[self.idx])
return x
class MMUProjection2(MMUProjection):
def _project(self, x: npt.NDArray) -> npt.NDArray:
"""
Second idea:
Project to closest point, e.g. if x[i] < mmu/2 set x[i] to 0, else set
x[i] to mmu.
"""
xp = cp if isinstance(x, cp.ndarray) else np
x[self.idx] = xp.where(
x[self.idx] <= self.mmu / 2,
0,
xp.where(x[self.idx] <= self.mmu, self.mmu, x[self.idx]),
)
return x
class MMUProjectionMinMMUPercentage(MMUProjection):
"""
A minimum percentage of the elements have to be at level of the mmu or
the rest has to be above.
If too many elements are below mmu, the ones closest below mmu are set to
mmu until the percentage is reached.
"""
def __init__(
self,
mmu: float | npt.NDArray,
min_percentage: float,
idx: npt.NDArray | None = None,
relaxation: float = 1.0,
proximity_flag=True,
use_gpu=False,
):
super().__init__(
mmu, relaxation=relaxation, idx=idx, proximity_flag=proximity_flag, _use_gpu=use_gpu
)
self.min_percentage = min_percentage
def _project(self, x: npt.NDArray) -> np.ndarray:
"""
Projects the input array `x` onto the DVH constraint.
Parameters
----------
x : npt.NDArray
The input array to be projected.
Returns
-------
npt.NDArray
The projected array.
"""
if isinstance(self.idx, slice):
return self._project_all(x)
return self._project_subset(x)
def _project_all(self, x: npt.NDArray) -> np.ndarray:
xp = cp if isinstance(x, cp.ndarray) else np
n = len(x)
am = math.ceil(
self.min_percentage * n
) # find number of elements that need to be at least mmu
l = (x < self.mmu).sum() # number of elements not fullfilling the mmu requirement
z = l - n + am
if z > 0:
x[x.argsort()[n - am : l]] = self.mmu
x[self.idx] = xp.where(
x[self.idx] <= self.mmu / 2,
0,
xp.where(x[self.idx] <= self.mmu, self.mmu, x[self.idx]),
)
return x
def _project_subset(self, x: npt.NDArray) -> np.ndarray:
xp = cp if isinstance(x, cp.ndarray) else np
n = self.idx.sum() if self.idx.dtype == bool else len(self.idx)
am = math.ceil(self.min_percentage * n)
l = (x[self.idx] < self.mmu).sum()
z = l - n + am # number of elements that need to be reduced
if z > 0:
x[self._idx_indices[x[self.idx].argsort()[n - am : l]]] = self.mmu
x[self.idx] = xp.where(
x[self.idx] <= self.mmu / 2,
0,
xp.where(x[self.idx] <= self.mmu, self.mmu, x[self.idx]),
)
return x
class MMUProjectionMinZeroPercentage(MMUProjection):
"""
A minimum percentage of the elements must have a value of 0.
If too many elements are above, the ones closest are set to 0 until the
percentage is reached.
"""
def __init__(
self,
mmu: float | npt.NDArray,
min_percentage: float,
idx: npt.NDArray | None = None,
relaxation: float = 1.0,
proximity_flag=True,
use_gpu=False,
):
super().__init__(
mmu, relaxation=relaxation, idx=idx, proximity_flag=proximity_flag, _use_gpu=use_gpu
)
self.min_percentage = min_percentage
self.idxs = []
def _project(self, x: npt.NDArray) -> np.ndarray:
"""
Projects the input array `x` onto the DVH constraint.
Parameters
----------
x : npt.NDArray
The input array to be projected.
Returns
-------
npt.NDArray
The projected array.
"""
if isinstance(self.idx, slice):
return self._project_all(x)
return self._project_subset(x)
def _project_all(self, x: npt.NDArray) -> np.ndarray:
xp = cp if isinstance(x, cp.ndarray) else np
n = len(x)
am = math.ceil(self.min_percentage * n) # find number of elements that need to be 0
l = (x > 0).sum() # number of elements that are above the requirement
z = l - n + am
if z > 0:
x[x.argsort()[:am]] = 0
x[self.idx] = xp.where(
x[self.idx] <= self.mmu / 2,
0,
xp.where(x[self.idx] <= self.mmu, self.mmu, x[self.idx]),
)
return x
def _project_subset(self, x: npt.NDArray) -> np.ndarray:
xp = cp if isinstance(x, cp.ndarray) else np
n = self.idx.sum() if self.idx.dtype == bool else len(self.idx)
am = math.ceil(self.min_percentage * n)
l = (x[self.idx] > 0).sum() # number of elements that are above the requirement
z = l - n + am # number of elements that need to be reduced
if z > 0:
x[self._idx_indices[x[self.idx].argsort()[:am]]] = 0
x[self.idx] = xp.where(
x[self.idx] <= self.mmu / 2,
0,
xp.where(x[self.idx] <= self.mmu, self.mmu, x[self.idx]),
)
return x
class VariableMMUProjectionMinMaxZeroPercentage(MMUProjection):
"""
A minimum percentage of the elements must have a value of 0, a minimum
percentage of the elements must be at least mmu and the rest can be above.
Each constraint can have a different MMU!
"""
def __init__(
self,
mmu: npt.NDArray,
min_percentage: float,
max_percentage: float,
idx: npt.NDArray | None = None,
relaxation: float = 1.0,
proximity_flag=True,
use_gpu=False,
):
super().__init__(
mmu, relaxation=relaxation, idx=idx, proximity_flag=proximity_flag, _use_gpu=use_gpu
)
self.min_percentage = min_percentage
self.max_percentage = (
max_percentage # equal to the number of elements that need to be at MMU or above
)
def _project(self, x: npt.NDArray) -> np.ndarray:
"""
Projects the input array `x` onto the DVH constraint.
Parameters
----------
x : npt.NDArray
The input array to be projected.
Returns
-------
npt.NDArray
The projected array.
"""
if isinstance(self.idx, slice):
return self._project_all(x)
return self._project_subset(x)
def _project_all(self, x: npt.NDArray) -> np.ndarray:
xp = cp if isinstance(x, cp.ndarray) else np
n = len(x)
num_min = math.ceil(self.min_percentage * n) # find number of elements that need to be 0
num_max = math.ceil(
self.max_percentage * n
) # find number of elements that need to be at MMU or above
num_above = (x > 0).sum() # number of elements that are above the requirement
# find cost of moving elements to MMU and to 0
cost = x - xp.max(self.mmu, x)
sort_idxs = cost.argsort()
x[sort_idxs[:num_min]] = 0
x[sort_idxs[num_max:]] = xp.where(
x[sort_idxs[num_max:]] < self.mmu[sort_idxs[num_max:]],
self.mmu[sort_idxs[num_max:]],
x[sort_idxs[num_max:]],
)
mask = sort_idxs[num_min:num_max]
x[mask] = xp.where(x[mask] <= self.mmu[mask] / 2, 0, xp.maximum(self.mmu[mask], x[mask]))
return x
# def _project_subset(self, x: npt.NDArray) -> np.ndarray:
# xp = cp if isinstance(x, cp.ndarray) else np
# n = self.idx.sum() if self.idx.dtype == bool else len(self.idx)
# am = math.ceil(self.min_percentage * n)
# l = (x[self.idx] > 0).sum() #number of elements that are above the requirement
# z = l - n + am # number of elements that need to be reduced
# if z > 0:
# x[self._idx_indices[x[self.idx].argsort()[:am]]] = 0
# x[self.idx] = xp.where(
# x[self.idx] <= self.mmu / 2,
# 0,
# xp.where(
# x[self.idx] <= self.mmu,
# self.mmu,
# x[self.idx]
# )
# )
# return x