Skip to content

tools

Scalarization

Defines various functions for scalarizing multiobjective optimization problems.

Note that when scalarization functions are defined, they must add the post-fix '_min' to any symbol representing objective functions so that the maximization or minimization of the corresponding objective functions may be correctly accounted for when computing scalarization function values.

Op

Defines the supported operators in the MathJSON format.

Source code in desdeo/tools/scalarization.py
class Op:
    """Defines the supported operators in the MathJSON format."""

    # TODO: move this to problem/schema.py, make it use this, and import it here from there
    # Basic arithmetic operators
    NEGATE = "Negate"
    ADD = "Add"
    SUB = "Subtract"
    MUL = "Multiply"
    DIV = "Divide"

    # Exponentation and logarithms
    EXP = "Exp"
    LN = "Ln"
    LB = "Lb"
    LG = "Lg"
    LOP = "LogOnePlus"
    SQRT = "Sqrt"
    SQUARE = "Square"
    POW = "Power"

    # Rounding operators
    ABS = "Abs"
    CEIL = "Ceil"
    FLOOR = "Floor"

    # Trigonometric operations
    ARCCOS = "Arccos"
    ARCCOSH = "Arccosh"
    ARCSIN = "Arcsin"
    ARCSINH = "Arcsinh"
    ARCTAN = "Arctan"
    ARCTANH = "Arctanh"
    COS = "Cos"
    COSH = "Cosh"
    SIN = "Sin"
    SINH = "Sinh"
    TAN = "Tan"
    TANH = "Tanh"

    # Comparison operators
    EQUAL = "Equal"
    GREATER = "Greater"
    GREATER_EQUAL = "GreaterEqual"
    LESS = "Less"
    LESS_EQUAL = "LessEqual"
    NOT_EQUAL = "NotEqual"

    # Other operators
    MAX = "Max"
    RATIONAL = "Rational"

ScalarizationError

Bases: Exception

Raised when issues with creating or adding scalarization functions are encountered.

Source code in desdeo/tools/scalarization.py
class ScalarizationError(Exception):
    """Raised when issues with creating or adding scalarization functions are encountered."""

__create_HDF

__create_HDF(
    y: str,
    a: float,
    r: float,
    d1: float = 0.9,
    d2: float = 0.1,
) -> str

Create a Harrington's one-sided desirability function.

Harrington's desirability function is used to compute the desirability of a given value of an objective function based on its aspiration and reservation levels.

The desirability function is defined as follows:

where

The desirability function returns a value between 0 and 1, where higher values indicate more desirable outcomes. I took the equations from the following source: Wagner, T., and Trautmann, H. Integration of preference in hypervolume-based multiobjective evolutionary algorithms by means of desirability functions. IEEE Transactions on Evolutionary Computation 14, 5 (2010), 688-701.

Parameters:

Name Type Description Default
y str

The objective value to compute the desirability for.

required
a float

Aspiration level for the objective.

required
r float

Reservation level for the objective.

required
d1 float

The desirability for the aspiration level.

0.9
d2 float

The desirability for the reservation level.

0.1

Returns:

Name Type Description
callable Function

A function that computes the desirability for a given value.

Source code in desdeo/tools/scalarization.py
def __create_HDF(  # noqa: N802
    y: str,
    a: float,
    r: float,
    d1: float = 0.9,
    d2: float = 0.1,
) -> str:
    r"""Create a Harrington's one-sided desirability function.

    Harrington's desirability function is used to compute the desirability of a
    given value of an objective function based on its aspiration and reservation levels.

    The desirability function is defined as follows:
    \begin{equation}
        D(y) = \exp\left(-\exp\left(-b_0 - b_1 y\right)\right),
    \end{equation}

    where
    \begin{align*}
        b_0 &= -\log(-\log(d_1)) - b_1 a, \\
        b_1 &= \frac{\log(-\log(d_2)) - \log(-\log(d_1))}{r - a}.
    \end{align*}

    The desirability function returns a value between 0 and 1, where higher values indicate
    more desirable outcomes. I took the equations from the following source:
    Wagner, T., and Trautmann, H. Integration of preference in hypervolume-based
    multiobjective evolutionary algorithms by means of desirability functions.
    IEEE Transactions on Evolutionary Computation 14, 5 (2010), 688-701.

    Args:
        y (str): The objective value to compute the desirability for.
        a (float): Aspiration level for the objective.
        r (float): Reservation level for the objective.
        d1 (float): The desirability for the aspiration level.
        d2 (float): The desirability for the reservation level.

    Returns:
        callable (Function): A function that computes the desirability for a given value.
    """
    if not (0 < d1 < 1 and 0 < d2 < 1):
        raise ValueError("Desirability values must be between 0 and 1 (exclusive).")
    if not (a < r):
        raise ValueError("a must be less than r.")
    if not d2 < d1:
        raise ValueError("d2 must be less than d1. Higher desirability should correspond to lower values of y.")
    b1: float = -np.log(-np.log(d2)) + np.log(-np.log(d1)) / (r - a)
    b0: float = -np.log(-np.log(d1)) - b1 * a

    def __HDF(y: float):  # noqa: N802
        """Compute the desirability for a given value."""
        return np.exp(-np.exp(-(b0 + b1 * y)))

    return f"Exp(-Exp(-({b0} + {b1} * {y})))"

__create_MDF

__create_MDF(
    y: str,
    a: float,
    r: float,
    d1: float = 0.9,
    d2: float = 0.1,
) -> str

Create MaoMao's desirability function.

Distinctions form MaoMao's original function: - The upper and lower bounds of desirability are fixed to 0 and 1, respectively.

Parameters:

Name Type Description Default
y str

The objective value to compute the desirability for.

required
a float

Aspiration level for the objective.

required
r float

Reservation level for the objective.

required
d1 float

The desirability for the aspiration level.

0.9
d2 float

The desirability for the reservation level.

0.1

Returns:

Name Type Description
callable Function

A function that computes the desirability for a given value.

Source code in desdeo/tools/scalarization.py
def __create_MDF(y: str, a: float, r: float, d1: float = 0.9, d2: float = 0.1) -> str:  # noqa: N802
    """Create MaoMao's desirability function.

    Distinctions form MaoMao's original function:
    - The upper and lower bounds of desirability are fixed to 0 and 1, respectively.

    Args:
        y (str): The objective value to compute the desirability for.
        a (float): Aspiration level for the objective.
        r (float): Reservation level for the objective.
        d1 (float): The desirability for the aspiration level.
        d2 (float): The desirability for the reservation level.

    Returns:
        callable (Function): A function that computes the desirability for a given value.
    """
    if not (0 < d1 < 1 and 0 < d2 < 1):
        raise ValueError("Desirability values must be between 0 and 1 (exclusive).")
    if not (a < r):
        raise ValueError("a must be less than r.")
    if not d2 < d1:
        raise ValueError("d2 must be less than d1. Higher desirability should correspond to lower values of y.")
    ea = 1 - d1
    er = d2
    m1 = -ea * ea * (a - r) / (d1 - d2)
    b1 = -a + ea * (a - r) / (d1 - d2)
    m2 = (d1 - d2) / (a - r)
    b2 = (d2 * a - d1 * r) / (a - r)
    m3 = -er * er * (a - r) / (d1 - d2)
    b3 = -r - er * (a - r) / (d1 - d2)

    def MDF1(y):  # noqa: N802
        """Compute the desirability for a given value."""
        if isinstance(y, np.ndarray):
            return np.array([MDF1(yi) for yi in y])
        if y < a:
            return 1 + m1 / (y + b1)
        if a <= y <= r:
            return m2 * y + b2
        return m3 / (y + b3)

    def MDF(y):  # noqa: N802
        """Compute the desirability for a given value."""
        # Same but without the if statements
        if isinstance(y, np.ndarray):
            return np.array([MDF(yi) for yi in y])
        return (
            max(a - y, 0) * (1 + m1 / (y + b1)) / (a - y)
            + max(y - r, 0) * (m3 / (y + b3)) / (y - r)
            + max(y - a, 0) * max(r - y, 0) * (m2 * y + b2) / ((y - a) * (r - y))
        )

    return (
        f"Max({a} - {y}, 0) * (1 + {m1} / ({y} + {b1})) / ({a} - {y}) + "
        f"Max({y} - {r}, 0) * ({m3} / ({y} + {b3})) / ({y} - {r}) + "
        f"Max({y} - {a}, 0) * Max({r} - {y}, 0) * ({m2} * {y} + {b2}) / "
        f"(({y} - {a}) * ({r} - {y}))"
    )

add_asf_diff

add_asf_diff(
    problem: Problem,
    symbol: str,
    reference_point: dict[str, float],
    ideal: dict[str, float] | None = None,
    nadir: dict[str, float] | None = None,
    rho: float = 1e-06,
    delta: float = 1e-06,
) -> tuple[Problem, str]

Adds the differentiable variant of the achievement scalarizing function.

\[\begin{align*} \min \quad & \alpha + \rho \sum_{i=1}^k \frac{f_i(\mathbf{x})}{z_i^\text{nad} - z_i^{\star\star}} \\ \text{s.t.} \quad & \frac{f_i(\mathbf{x}) - \bar{z}_i}{z_i^\text{nad} - z_i^{\star\star}} - \alpha \leq 0,\\ & \mathbf{x} \in S, \end{align*}\]

where \(f_i\) are objective functions, \(z_i^{\star\star} = z_i^\star - \delta\) is a component of the utopian point, \(\bar{z}_i\) is a component of the reference point, \(\rho\) and \(\delta\) are small scalar values, \(S\) is the feasible solution space of the original problem, and \(\alpha\) is an auxiliary variable.

References

Wierzbicki, A. P. (1982). A mathematical basis for satisficing decision making. Mathematical modelling, 3(5), 391-405.

Parameters:

Name Type Description Default
problem Problem

the problem the scalarization is added to.

required
symbol str

the symbol given to the added scalarization.

required
reference_point dict[str, float]

a dict with keys corresponding to objective function symbols and values to reference point components, i.e., aspiration levels.

required
ideal dict[str, float]

ideal point values. If not given, attempt will be made to calculate ideal point from problem.

None
nadir dict[str, float]

nadir point values. If not given, attempt will be made to calculate nadir point from problem.

None
rho float

a small scalar value to scale the sum in the objective function of the scalarization. Defaults to 1e-6.

1e-06
delta float

a small scalar to define the utopian point. Defaults to 1e-6.

1e-06

Returns:

Type Description
tuple[Problem, str]

tuple[Problem, str]: a tuple with the copy of the problem with the added scalarization and the symbol of the added scalarization.

Todo

Add reference in augmentation term option!

Source code in desdeo/tools/scalarization.py
def add_asf_diff(
    problem: Problem,
    symbol: str,
    reference_point: dict[str, float],
    ideal: dict[str, float] | None = None,
    nadir: dict[str, float] | None = None,
    rho: float = 1e-6,
    delta: float = 1e-6,
) -> tuple[Problem, str]:
    r"""Adds the differentiable variant of the achievement scalarizing function.

    \begin{align*}
        \min \quad & \alpha + \rho \sum_{i=1}^k \frac{f_i(\mathbf{x})}{z_i^\text{nad} - z_i^{\star\star}} \\
        \text{s.t.} \quad & \frac{f_i(\mathbf{x}) - \bar{z}_i}{z_i^\text{nad}
        - z_i^{\star\star}} - \alpha \leq 0,\\
        & \mathbf{x} \in S,
    \end{align*}

    where $f_i$ are objective functions, $z_i^{\star\star} = z_i^\star - \delta$ is
    a component of the utopian point, $\bar{z}_i$ is a component of the reference point,
    $\rho$ and $\delta$ are small scalar values, $S$ is the feasible solution
    space of the original problem, and $\alpha$ is an auxiliary variable.

    References:
        Wierzbicki, A. P. (1982). A mathematical basis for satisficing decision
            making. Mathematical modelling, 3(5), 391-405.

    Args:
        problem (Problem): the problem the scalarization is added to.
        symbol (str): the symbol given to the added scalarization.
        reference_point (dict[str, float]): a dict with keys corresponding to objective
            function symbols and values to reference point components, i.e.,
            aspiration levels.
        ideal (dict[str, float], optional): ideal point values. If not given, attempt will be made
            to calculate ideal point from problem.
        nadir (dict[str, float], optional): nadir point values. If not given, attempt will be made
            to calculate nadir point from problem.
        rho (float, optional): a small scalar value to scale the sum in the objective
            function of the scalarization. Defaults to 1e-6.
        delta (float, optional): a small scalar to define the utopian point. Defaults to 1e-6.

    Returns:
        tuple[Problem, str]: a tuple with the copy of the problem with the added
            scalarization and the symbol of the added scalarization.

    Todo:
        Add reference in augmentation term option!
    """
    # check reference point
    if not objective_dict_has_all_symbols(problem, reference_point):
        msg = f"The give reference point {reference_point} is missing value for one or more objectives."
        raise ScalarizationError(msg)

    # check if ideal point is specified
    # if not specified, try to calculate corrected ideal point
    if ideal is not None:
        ideal_point = ideal
    elif problem.get_ideal_point() is not None:
        ideal_point = get_corrected_ideal(problem)
    else:
        msg = "Ideal point not defined!"
        raise ScalarizationError(msg)

    # check if nadir point is specified
    # if not specified, try to calculate corrected nadir point
    if nadir is not None:
        nadir_point = nadir
    elif problem.get_nadir_point() is not None:
        nadir_point = get_corrected_nadir(problem)
    else:
        msg = "Nadir point not defined!"
        raise ScalarizationError(msg)

    corrected_rp = flip_maximized_objective_values(problem, reference_point)

    # define the auxiliary variable
    alpha = Variable(
        name="alpha",
        symbol="_alpha",
        variable_type=VariableTypeEnum.real,
        lowerbound=-float("Inf"),
        upperbound=float("Inf"),
        initial_value=1.0,
    )

    # define the objective function of the scalarization
    aug_expr = " + ".join(
        [
            (f"{obj.symbol}_min / ({nadir_point[obj.symbol]} - {ideal_point[obj.symbol] - delta})")
            for obj in problem.objectives
        ]
    )

    target_expr = f"_alpha + {rho}*" + f"({aug_expr})"
    scalarization = ScalarizationFunction(
        name="ASF scalarization objective function",
        symbol=symbol,
        func=target_expr,
        is_linear=problem.is_linear,
        is_convex=problem.is_convex,
        is_twice_differentiable=problem.is_twice_differentiable,
    )

    constraints = []

    for obj in problem.objectives:
        expr = (
            f"({obj.symbol}_min - {corrected_rp[obj.symbol]}) / "
            f"({nadir_point[obj.symbol]} - {ideal_point[obj.symbol] - delta}) - _alpha"
        )

        constraints.append(
            Constraint(
                name=f"Constraint for {obj.symbol}",
                symbol=f"{obj.symbol}_con",
                func=expr,
                cons_type=ConstraintTypeEnum.LTE,
                is_linear=obj.is_linear,
                is_convex=obj.is_convex,
                is_twice_differentiable=obj.is_twice_differentiable,
            )
        )

    _problem = problem.add_variables([alpha])
    _problem = _problem.add_scalarization(scalarization)
    return _problem.add_constraints(constraints), symbol

add_asf_generic_diff

add_asf_generic_diff(
    problem: Problem,
    symbol: str,
    reference_point: dict[str, float],
    weights: dict[str, float],
    reference_point_aug: dict[str, float] | None = None,
    weights_aug: dict[str, float] | None = None,
    rho: float = 1e-06,
) -> tuple[Problem, str]

Adds the differentiable variant of the generic achievement scalarizing function.

\[\begin{align*} \min \quad & \alpha + \rho \sum_{i=1}^k \frac{f_i(\mathbf{x})}{w_i} \\ \text{s.t.} \quad & \frac{f_i(\mathbf{x}) - q_i}{w_i} - \alpha \leq 0,\\ & \mathbf{x} \in S, \end{align*}\]

where \(f_i\) are objective functions, \(q_i\) is a component of the reference point, and \(w_i\) are components of the weight vector (which are assumed to be positive), \(\rho\) and \(\delta\) are small scalar values, \(S\) is the feasible solution space of the original problem, and \(\alpha\) is an auxiliary variable. The summation term in the scalarization is known as the augmentation term. If a reference point is chosen to be used in the augmentation term, e.g., a separate reference point for the augmentation term is given (reference_point_aug), then the reference point components are subtracted from the objective function values in the nominator of the augmentation term. That is:

\[\begin{align*} \min \quad & \alpha + \rho \sum_{i=1}^k \frac{f_i(\mathbf{x}) - q_i}{w_i} \\ \text{s.t.} \quad & \frac{f_i(\mathbf{x}) - q_i}{w_i} - \alpha \leq 0,\\ & \mathbf{x} \in S, \end{align*}\]
References

Wierzbicki, A. P. (1982). A mathematical basis for satisficing decision making. Mathematical modelling, 3(5), 391-405.

Parameters:

Name Type Description Default
problem Problem

the problem the scalarization is added to.

required
symbol str

the symbol given to the added scalarization.

required
reference_point dict[str, float]

a dict with keys corresponding to objective function symbols and values to reference point components, i.e., aspiration levels.

required
weights dict[str, float]

the weights to be used in the scalarization function. Must be positive.

required
reference_point_aug dict[str, float]

a dict with keys corresponding to objective function symbols and values to reference point components for the augmentation term, i.e., aspiration levels. Defeults to None.

None
weights_aug dict[str, float]

the weights to be used in the scalarization function's augmentation term. Must be positive. Defaults to None.

None
rho float

a small scalar value to scale the sum in the objective function of the scalarization. Defaults to 1e-6.

1e-06

Returns:

Type Description
tuple[Problem, str]

tuple[Problem, str]: a tuple with the copy of the problem with the added scalarization and the symbol of the added scalarization.

Source code in desdeo/tools/scalarization.py
def add_asf_generic_diff(
    problem: Problem,
    symbol: str,
    reference_point: dict[str, float],
    weights: dict[str, float],
    reference_point_aug: dict[str, float] | None = None,
    weights_aug: dict[str, float] | None = None,
    rho: float = 1e-6,
) -> tuple[Problem, str]:
    r"""Adds the differentiable variant of the generic achievement scalarizing function.

    \begin{align*}
        \min \quad & \alpha + \rho \sum_{i=1}^k \frac{f_i(\mathbf{x})}{w_i} \\
        \text{s.t.} \quad & \frac{f_i(\mathbf{x}) - q_i}{w_i} - \alpha \leq 0,\\
        & \mathbf{x} \in S,
    \end{align*}

    where $f_i$ are objective functions, $q_i$ is a component of the reference point,
    and $w_i$ are components of the weight vector (which are assumed to be positive),
    $\rho$ and $\delta$ are small scalar values, $S$ is the feasible solution
    space of the original problem, and $\alpha$ is an auxiliary variable.
    The summation term in the scalarization is known as the _augmentation term_.
    If a reference point is chosen to be used in the augmentation term, e.g., a separate
    reference point for the augmentation term is given (`reference_point_aug`), then
    the reference point components are subtracted from the objective function values
    in the nominator of the augmentation term. That is:

    \begin{align*}
        \min \quad & \alpha + \rho \sum_{i=1}^k \frac{f_i(\mathbf{x}) - q_i}{w_i} \\
        \text{s.t.} \quad & \frac{f_i(\mathbf{x}) - q_i}{w_i} - \alpha \leq 0,\\
        & \mathbf{x} \in S,
    \end{align*}

    References:
        Wierzbicki, A. P. (1982). A mathematical basis for satisficing decision
            making. Mathematical modelling, 3(5), 391-405.

    Args:
        problem (Problem): the problem the scalarization is added to.
        symbol (str): the symbol given to the added scalarization.
        reference_point (dict[str, float]): a dict with keys corresponding to objective
            function symbols and values to reference point components, i.e.,
            aspiration levels.
        weights (dict[str, float]): the weights to be used in the scalarization function. Must be positive.
        reference_point_aug (dict[str, float], optional): a dict with keys corresponding to objective
            function symbols and values to reference point components for the augmentation term, i.e.,
            aspiration levels. Defeults to None.
        weights_aug (dict[str, float], optional): the weights to be used in the scalarization function's
            augmentation term. Must be positive. Defaults to None.
        rho (float, optional): a small scalar value to scale the sum in the objective
            function of the scalarization. Defaults to 1e-6.

    Returns:
        tuple[Problem, str]: a tuple with the copy of the problem with the added
            scalarization and the symbol of the added scalarization.
    """
    # check reference point
    if not objective_dict_has_all_symbols(problem, reference_point):
        msg = f"The give reference point {reference_point} is missing a value for one or more objectives."
        raise ScalarizationError(msg)

    # check augmentation term reference point
    if reference_point_aug is not None and not objective_dict_has_all_symbols(problem, reference_point_aug):
        msg = (
            f"The given reference point for the augmentation term {reference_point_aug} "
            "does not have a component defined for all the objectives."
        )
        raise ScalarizationError(msg)

    # check the weight vector
    if not objective_dict_has_all_symbols(problem, weights):
        msg = f"The given weight vector {weights} is missing a value for one or more objectives."
        raise ScalarizationError(msg)

    # check the weight vector for the augmentation term
    if weights_aug is not None and not objective_dict_has_all_symbols(problem, weights_aug):
        msg = f"The given weight vector {weights_aug} is missing a value for one or more objectives."
        raise ScalarizationError(msg)

    corrected_rp = flip_maximized_objective_values(problem, reference_point)
    if reference_point_aug is not None:
        corrected_rp_aug = flip_maximized_objective_values(problem, reference_point_aug)

    # define the auxiliary variable
    alpha = Variable(
        name="alpha",
        symbol="_alpha",
        variable_type=VariableTypeEnum.real,
        lowerbound=-float("Inf"),
        upperbound=float("Inf"),
        initial_value=1.0,
    )

    # define the augmentation term
    if reference_point_aug is None and weights_aug is None:
        # no reference point in augmentation term
        # same weights for both terms
        aug_expr = " + ".join([f"({obj.symbol}_min / {weights[obj.symbol]})" for obj in problem.objectives])
    elif reference_point_aug is None and weights_aug is not None:
        # different weights provided for augmentation term
        aug_expr = " + ".join([f"({obj.symbol}_min / {weights_aug[obj.symbol]})" for obj in problem.objectives])
    elif reference_point_aug is not None and weights_aug is None:
        # reference point in augmentation term
        aug_expr = " + ".join(
            [
                f"(({obj.symbol}_min - {corrected_rp_aug[obj.symbol]}) / {weights[obj.symbol]})"
                for obj in problem.objectives
            ]
        )
    else:
        aug_expr = " + ".join(
            [
                f"(({obj.symbol}_min - {corrected_rp_aug[obj.symbol]}) / {weights_aug[obj.symbol]})"
                for obj in problem.objectives
            ]
        )

    target_expr = f"_alpha + {rho}*" + f"({aug_expr})"
    scalarization = ScalarizationFunction(
        name="Generic ASF scalarization objective function",
        symbol=symbol,
        func=target_expr,
        is_convex=problem.is_convex,
        is_linear=problem.is_linear,
        is_twice_differentiable=problem.is_twice_differentiable,
    )

    constraints = []

    for obj in problem.objectives:
        expr = f"({obj.symbol}_min - {corrected_rp[obj.symbol]}) / {weights[obj.symbol]} - _alpha"

        # since we are subtracting a constant value, the linearity, convexity,
        # and differentiability of the objective function, and hence the
        # constraint, should not change.
        constraints.append(
            Constraint(
                name=f"Constraint for {obj.symbol}",
                symbol=f"{obj.symbol}_con",
                func=expr,
                cons_type=ConstraintTypeEnum.LTE,
                is_linear=obj.is_linear,
                is_convex=obj.is_convex,
                is_twice_differentiable=obj.is_twice_differentiable,
            )
        )

    _problem = problem.add_variables([alpha])
    _problem = _problem.add_scalarization(scalarization)
    return _problem.add_constraints(constraints), symbol

add_asf_generic_nondiff

add_asf_generic_nondiff(
    problem: Problem,
    symbol: str,
    reference_point: dict[str, float],
    weights: dict[str, float],
    reference_point_aug: dict[str, float] | None = None,
    weights_aug: dict[str, float] | None = None,
    rho: float = 1e-06,
) -> tuple[Problem, str]

Adds the generic achievement scalarizing function to a problem with the given reference point, and weights.

This is the non-differentiable variant of the generic achievement scalarizing function, which means the resulting scalarization function is non-differentiable. Compared to add_asf_nondiff, this variant is useful, when the problem being scalarized does not have a defined ideal or nadir point, or both. The weights should be non-zero to avoid zero division.

The scalarization is defined as follows:

\[\begin{equation} \mathcal{S}_\text{ASF}(F(\mathbf{x}); \mathbf{q}, \mathbf{w}) = \underset{i=1,\ldots,k}{\text{max}} \left[ \frac{f_i(\mathbf{x}) - q_i}{w_i} \right] + \rho\sum_{i=1}^{k} \frac{f_i(\mathbf{x})}{w_i}, \end{equation}\]

where \(\mathbf{q} = [q_1,\dots,q_k]\) is a reference point, \(\mathbf{w} = [w_1,\dots,w_k]\) are weights, \(k\) is the number of objective functions, and \(\delta\) and \(\rho\) are small scalar values. The summation term in the scalarization is known as the augmentation term. If a reference point is chosen to be used in the augmentation term, e.g., a separate reference point for the augmentation term is given (reference_point_aug), then the reference point components are subtracted from the objective function values in the nominator of the augmentation term. That is:

\[\begin{equation} \mathcal{S}_\text{ASF}(F(\mathbf{x}); \mathbf{q}, \mathbf{w}) = \underset{i=1,\ldots,k}{\text{max}} \left[ \frac{f_i(\mathbf{x}) - q_i}{w_i} \right] + \rho\sum_{i=1}^{k} \frac{f_i(\mathbf{x}) - q_i}{w_i}. \end{equation}\]

Parameters:

Name Type Description Default
problem Problem

the problem to which the scalarization function should be added.

required
symbol str

the symbol to reference the added scalarization function.

required
reference_point dict[str, float]

a reference point with as many components as there are objectives.

required
weights dict[str, float]

the weights to be used in the scalarization function. must be positive.

required
reference_point_aug dict[str, float]

a dict with keys corresponding to objective function symbols and values to reference point components for the augmentation term, i.e., aspiration levels. Defeults to None.

None
weights_aug dict[str, float]

the weights to be used in the scalarization function's augmentation term. Must be positive. Defaults to None.

None
rho float

the weight factor used in the augmentation term. Defaults to 0.000001.

1e-06

Raises:

Type Description
ScalarizationError

If either the reference point or the weights given are missing any of the objective components.

ScalarizationError

If any of the ideal or nadir point values are undefined (None).

Returns:

Type Description
tuple[Problem, str]

tuple[Problem, str]: A tuple containing a copy of the problem with the scalarization function added, and the symbol of the added scalarization function.

Source code in desdeo/tools/scalarization.py
def add_asf_generic_nondiff(
    problem: Problem,
    symbol: str,
    reference_point: dict[str, float],
    weights: dict[str, float],
    reference_point_aug: dict[str, float] | None = None,
    weights_aug: dict[str, float] | None = None,
    rho: float = 0.000001,
) -> tuple[Problem, str]:
    r"""Adds the generic achievement scalarizing function to a problem with the given reference point, and weights.

    This is the non-differentiable variant of the generic achievement scalarizing function, which
    means the resulting scalarization function is non-differentiable. Compared to `add_asf_nondiff`, this
    variant is useful, when the problem being scalarized does not have a defined ideal or nadir point,
    or both. The weights should be non-zero to avoid zero division.

    The scalarization is defined as follows:

    \begin{equation}
        \mathcal{S}_\text{ASF}(F(\mathbf{x}); \mathbf{q}, \mathbf{w}) =
        \underset{i=1,\ldots,k}{\text{max}}
        \left[
        \frac{f_i(\mathbf{x}) - q_i}{w_i}
        \right]
        + \rho\sum_{i=1}^{k} \frac{f_i(\mathbf{x})}{w_i},
    \end{equation}

    where $\mathbf{q} = [q_1,\dots,q_k]$ is a reference point, $\mathbf{w} =
    [w_1,\dots,w_k]$ are weights, $k$ is the number of objective functions, and
    $\delta$ and $\rho$ are small scalar values. The summation term in the
    scalarization is known as the _augmentation term_. If a reference point is
    chosen to be used in the augmentation term, e.g., a separate
    reference point for the augmentation term is given (`reference_point_aug`), then
    the reference point components are subtracted from the objective function values
    in the nominator of the augmentation term. That is:

    \begin{equation}
        \mathcal{S}_\text{ASF}(F(\mathbf{x}); \mathbf{q}, \mathbf{w}) =
        \underset{i=1,\ldots,k}{\text{max}}
        \left[
        \frac{f_i(\mathbf{x}) - q_i}{w_i}
        \right]
        + \rho\sum_{i=1}^{k} \frac{f_i(\mathbf{x}) - q_i}{w_i}.
    \end{equation}

    Args:
        problem (Problem): the problem to which the scalarization function should be added.
        symbol (str): the symbol to reference the added scalarization function.
        reference_point (dict[str, float]): a reference point with as many components as there are objectives.
        weights (dict[str, float]): the weights to be used in the scalarization function. must be positive.
        reference_point_aug (dict[str, float], optional): a dict with keys corresponding to objective
            function symbols and values to reference point components for the augmentation term, i.e.,
            aspiration levels. Defeults to None.
        weights_aug (dict[str, float], optional): the weights to be used in the scalarization function's
            augmentation term. Must be positive. Defaults to None.
        rho (float, optional): the weight factor used in the augmentation term. Defaults to 0.000001.

    Raises:
        ScalarizationError: If either the reference point or the weights given are missing any of the objective
            components.
        ScalarizationError: If any of the ideal or nadir point values are undefined (None).

    Returns:
        tuple[Problem, str]: A tuple containing a copy of the problem with the scalarization function added,
            and the symbol of the added scalarization function.
    """
    # check reference point
    if not objective_dict_has_all_symbols(problem, reference_point):
        msg = f"The give reference point {reference_point} is missing a value for one or more objectives."
        raise ScalarizationError(msg)

    # check augmentation term reference point
    if reference_point_aug is not None and not objective_dict_has_all_symbols(problem, reference_point_aug):
        msg = (
            f"The given reference point for the augmentation term {reference_point_aug} "
            "does not have a component defined for all the objectives."
        )
        raise ScalarizationError(msg)

    # check the weight vector
    if not objective_dict_has_all_symbols(problem, weights):
        msg = f"The given weight vector {weights} is missing a value for one or more objectives."
        raise ScalarizationError(msg)

    # check the weight vector for the augmentation term
    if weights_aug is not None and not objective_dict_has_all_symbols(problem, weights_aug):
        msg = f"The given weight vector {weights_aug} is missing a value for one or more objectives."
        raise ScalarizationError(msg)

    # get the corrected reference point
    corrected_rp = flip_maximized_objective_values(problem, reference_point)
    if reference_point_aug is not None:
        corrected_rp_aug = flip_maximized_objective_values(problem, reference_point_aug)

    # Build the max term
    max_operands = [
        (f"({obj.symbol}_min - {corrected_rp[obj.symbol]}) / ({weights[obj.symbol]})") for obj in problem.objectives
    ]
    max_term = f"{Op.MAX}({', '.join(max_operands)})"

    # Build the augmentation term
    if reference_point_aug is None and weights_aug is None:
        # no reference point in augmentation term
        # same weights for both terms
        aug_expr = " + ".join([f"({obj.symbol}_min / {weights[obj.symbol]})" for obj in problem.objectives])
    elif reference_point_aug is None and weights_aug is not None:
        # different weights provided for augmentation term
        aug_expr = " + ".join([f"({obj.symbol}_min / {weights_aug[obj.symbol]})" for obj in problem.objectives])
    elif reference_point_aug is not None and weights_aug is None:
        # reference point in augmentation term
        aug_expr = " + ".join(
            [
                f"(({obj.symbol}_min - {corrected_rp_aug[obj.symbol]}) / {weights[obj.symbol]})"
                for obj in problem.objectives
            ]
        )
    else:
        aug_expr = " + ".join(
            [
                f"(({obj.symbol}_min - {corrected_rp_aug[obj.symbol]}) / {weights_aug[obj.symbol]})"
                for obj in problem.objectives
            ]
        )

    # Collect the terms
    sf = f"{max_term} + {rho} * ({aug_expr})"

    # Add the function to the problem
    scalarization_function = ScalarizationFunction(
        name="Generic achievement scalarizing function",
        symbol=symbol,
        func=sf,
        is_linear=False,
        is_convex=False,
        is_twice_differentiable=False,
    )
    return problem.add_scalarization(scalarization_function), symbol

add_asf_nondiff

add_asf_nondiff(
    problem: Problem,
    symbol: str,
    reference_point: dict[str, float],
    ideal: dict[str, float] | None = None,
    nadir: dict[str, float] | None = None,
    delta: float = 1e-06,
    rho: float = 1e-06,
    *,
    reference_in_aug=False,
) -> tuple[Problem, str]

Add the achievement scalarizing function for a problem with the reference point.

This is the non-differentiable variant of the achievement scalarizing function, which means the resulting scalarization function is non-differentiable. Requires that the ideal and nadir point have been defined for the problem.

The scalarization is defined as follows:

\[\begin{equation} \mathcal{S}_\text{ASF}(F(\mathbf{x}); \mathbf{q}, \mathbf{z}^\star, \mathbf{z}^\text{nad}) = \underset{i=1,\ldots,k}{\text{max}} \left[ \frac{f_i(\mathbf{x}) - q_i}{z^\text{nad}_i - (z_i^\star - \delta)} \right] + \rho\sum_{i=1}^{k} \frac{f_i(\mathbf{x})}{z_i^\text{nad} - (z_i^\star - \delta)}, \end{equation}\]

where \(\mathbf{q} = [q_1,\dots,q_k]\) is a reference point, \(\mathbf{z^\star} = [z_1^\star,\dots,z_k^\star]\) is the ideal point, \(\mathbf{z}^\text{nad} = [z_1^\text{nad},\dots,z_k^\text{nad}]\) is the nadir point, \(k\) is the number of objective functions, and \(\delta\) and \(\rho\) are small scalar values. The summation term in the scalarization is known as the augmentation term. If the reference point is chosen to be used in the augmentation term (reference_in_aug=True), then the reference point components are subtracted from the objective function values in the nominator of the augmentation term. That is:

\[\begin{equation} \mathcal{S}_\text{ASF}(F(\mathbf{x}); \mathbf{q}, \mathbf{z}^\star, \mathbf{z}^\text{nad}) = \underset{i=1,\ldots,k}{\text{max}} \left[ \frac{f_i(\mathbf{x}) - q_i}{z^\text{nad}_i - (z_i^\star - \delta)} \right] + \rho\sum_{i=1}^{k} \frac{f_i(\mathbf{x}) - q_i}{z_i^\text{nad} - (z_i^\star - \delta)}. \end{equation}\]

Parameters:

Name Type Description Default
problem Problem

the problem to which the scalarization function should be added.

required
symbol str

the symbol to reference the added scalarization function.

required
reference_point dict[str, float]

a reference point as an objective dict.

required
ideal dict[str, float]

ideal point values. If not given, attempt will be made to calculate ideal point from problem.

None
nadir dict[str, float]

nadir point values. If not given, attempt will be made to calculate nadir point from problem.

None
delta float

the scalar value used to define the utopian point (ideal - delta). Defaults to 0.000001.

1e-06
rho float

the weight factor used in the augmentation term. Defaults to 0.000001.

1e-06
reference_in_aug bool

whether the reference point should be used in the augmentation term as well. Defaults to False.

False

Raises:

Type Description
ScalarizationError

there are missing elements in the reference point, or if any of the ideal or nadir point values are undefined (None).

Returns:

Type Description
tuple[Problem, str]

tuple[Problem, str]: A tuple containing a copy of the problem with the scalarization function added, and the symbol of the added scalarization function.

Source code in desdeo/tools/scalarization.py
def add_asf_nondiff(
    problem: Problem,
    symbol: str,
    reference_point: dict[str, float],
    ideal: dict[str, float] | None = None,
    nadir: dict[str, float] | None = None,
    delta: float = 0.000001,
    rho: float = 0.000001,
    *,
    reference_in_aug=False,
) -> tuple[Problem, str]:
    r"""Add the achievement scalarizing function for a problem with the reference point.

    This is the non-differentiable variant of the achievement scalarizing function, which
    means the resulting scalarization function is non-differentiable.
    Requires that the ideal and nadir point have been defined for the problem.

    The scalarization is defined as follows:

    \begin{equation}
        \mathcal{S}_\text{ASF}(F(\mathbf{x}); \mathbf{q}, \mathbf{z}^\star, \mathbf{z}^\text{nad}) =
        \underset{i=1,\ldots,k}{\text{max}}
        \left[
        \frac{f_i(\mathbf{x}) - q_i}{z^\text{nad}_i - (z_i^\star - \delta)}
        \right]
        + \rho\sum_{i=1}^{k} \frac{f_i(\mathbf{x})}{z_i^\text{nad} - (z_i^\star - \delta)},
    \end{equation}

    where $\mathbf{q} = [q_1,\dots,q_k]$ is a reference point, $\mathbf{z^\star} = [z_1^\star,\dots,z_k^\star]$
    is the ideal point, $\mathbf{z}^\text{nad} = [z_1^\text{nad},\dots,z_k^\text{nad}]$ is the nadir point, $k$
    is the number of objective functions, and $\delta$ and $\rho$ are small scalar values. The summation term
    in the scalarization is known as the _augmentation term_. If the reference point is chosen to
    be used in the augmentation term (`reference_in_aug=True`), then
    the reference point components are subtracted from the objective function values in the nominator
    of the augmentation term. That is:

    \begin{equation}
        \mathcal{S}_\text{ASF}(F(\mathbf{x}); \mathbf{q}, \mathbf{z}^\star, \mathbf{z}^\text{nad}) =
        \underset{i=1,\ldots,k}{\text{max}}
        \left[
        \frac{f_i(\mathbf{x}) - q_i}{z^\text{nad}_i - (z_i^\star - \delta)}
        \right]
        + \rho\sum_{i=1}^{k} \frac{f_i(\mathbf{x}) - q_i}{z_i^\text{nad} - (z_i^\star - \delta)}.
    \end{equation}

    Args:
        problem (Problem): the problem to which the scalarization function should be added.
        symbol (str): the symbol to reference the added scalarization function.
        reference_point (dict[str, float]): a reference point as an objective dict.
        ideal (dict[str, float], optional): ideal point values. If not given, attempt will be made
            to calculate ideal point from problem.
        nadir (dict[str, float], optional): nadir point values. If not given, attempt will be made
            to calculate nadir point from problem.
        delta (float, optional): the scalar value used to define the utopian point (ideal - delta).
            Defaults to 0.000001.
        rho (float, optional): the weight factor used in the augmentation term. Defaults to 0.000001.
        reference_in_aug (bool): whether the reference point should be used in
            the augmentation term as well. Defaults to False.

    Raises:
        ScalarizationError: there are missing elements in the reference point, or if any of the ideal or nadir
            point values are undefined (None).

    Returns:
        tuple[Problem, str]: A tuple containing a copy of the problem with the scalarization function added,
            and the symbol of the added scalarization function.
    """
    # check that the reference point has all the objective components
    if not objective_dict_has_all_symbols(problem, reference_point):
        msg = f"The given reference point {reference_point} does not have a component defined for all the objectives."
        raise ScalarizationError(msg)

    # check if ideal point is specified
    # if not specified, try to calculate corrected ideal point
    if ideal is not None:
        ideal_point = ideal
    elif problem.get_ideal_point() is not None:
        ideal_point = get_corrected_ideal(problem)
    else:
        msg = "Ideal point not defined!"
        raise ScalarizationError(msg)

    # check if nadir point is specified
    # if not specified, try to calculate corrected nadir point
    if nadir is not None:
        nadir_point = nadir
    elif problem.get_nadir_point() is not None:
        nadir_point = get_corrected_nadir(problem)
    else:
        msg = "Nadir point not defined!"
        raise ScalarizationError(msg)

    if any(value is None for value in ideal_point.values()) or any(value is None for value in nadir_point.values()):
        msg = f"There are undefined values in either the ideal ({ideal_point}) or the nadir point ({nadir_point})."
        raise ScalarizationError(msg)

    # Build the max term
    max_operands = [
        (
            f"({obj.symbol}_min - {reference_point[obj.symbol]}{' * -1' if obj.maximize else ''}) "
            f"/ ({nadir_point[obj.symbol]} - ({ideal_point[obj.symbol]} - {delta}))"
        )
        for obj in problem.objectives
    ]
    max_term = f"{Op.MAX}({', '.join(max_operands)})"

    # Build the augmentation term
    if not reference_in_aug:
        aug_operands = [
            f"{obj.symbol}_min / ({nadir_point[obj.symbol]} - ({ideal_point[obj.symbol]} - {delta}))"
            for obj in problem.objectives
        ]
    else:
        aug_operands = [
            (
                f"({obj.symbol}_min - {reference_point[obj.symbol]}{' * -1' if obj.maximize else 1}) "
                f"/ ({nadir_point[obj.symbol]} - ({ideal_point[obj.symbol]} - {delta}))"
            )
            for obj in problem.objectives
        ]

    aug_term = " + ".join(aug_operands)

    asf_function = f"{max_term} + {rho} * ({aug_term})"

    # Add the function to the problem
    scalarization_function = ScalarizationFunction(
        name="Achievement scalarizing function",
        symbol=symbol,
        func=asf_function,
        is_linear=False,
        is_convex=False,
        is_twice_differentiable=False,
    )
    return problem.add_scalarization(scalarization_function), symbol

add_desirability_funcs

add_desirability_funcs(
    problem: Problem,
    aspiration_levels: dict[str, float],
    reservation_levels: dict[str, float],
    desirability_levels: dict[str, tuple[float, float]]
    | None = None,
    desirability_func: Literal[
        "Harrington", "MaoMao"
    ] = "Harrington",
) -> tuple[Problem, list[str]]

Adds desirability functions to the problem based on the given aspiration and reservation levels.

Note that the desirability functions are added as scalarization functions to the problem. They are also multiplied by -1 to ensure that "desirability" values can be minimized, as is assumed by the optimizers.

Parameters:

Name Type Description Default
problem Problem

The problem to which the desirability functions should be added.

required
aspiration_levels dict[str, float]

A dictionary with keys corresponding to objective function symbols and values to aspiration levels.

required
reservation_levels dict[str, float]

A dictionary with keys corresponding to objective function symbols and values to reservation levels.

required
desirability_levels dict[str, tuple[float, float]] | None

A dictionary with keys corresponding to objective function symbols and values to desirability levels, where each value is a tuple of (d1, d2). If not given, the default values for d1 and d2 are used, which are 0.9 and 0.1 respectively. Defaults to None.

None
desirability_func str

The type of desirability function to use. Currently, only "Harrington" or "MaoMao" is supported. Defaults to "Harrington".

'Harrington'

Returns:

Name Type Description
Problem Problem

A copy of the problem with the added desirability functions as scalarization functions.

list[str]

list[str]: A list of symbols of the added desirability functions.

Source code in desdeo/tools/scalarization.py
def add_desirability_funcs(
    problem: Problem,
    aspiration_levels: dict[str, float],
    reservation_levels: dict[str, float],
    desirability_levels: dict[str, tuple[float, float]] | None = None,
    desirability_func: Literal["Harrington", "MaoMao"] = "Harrington",
) -> tuple[Problem, list[str]]:
    """Adds desirability functions to the problem based on the given aspiration and reservation levels.

    Note that the desirability functions are added as scalarization functions to the problem. They are also multiplied
    by -1 to ensure that "desirability" values can be minimized, as is assumed by the optimizers.

    Args:
        problem (Problem): The problem to which the desirability functions should be added.
        aspiration_levels (dict[str, float]): A dictionary with keys corresponding to objective function symbols
            and values to aspiration levels.
        reservation_levels (dict[str, float]): A dictionary with keys corresponding to objective function symbols
            and values to reservation levels.
        desirability_levels (dict[str, tuple[float, float]] | None, optional): A dictionary with keys corresponding to
            objective function symbols and values to desirability levels, where each value is a tuple of (d1, d2). If
            not given, the default values for d1 and d2 are used, which are 0.9 and 0.1 respectively. Defaults to None.
        desirability_func (str, optional): The type of desirability function to use. Currently, only "Harrington" or
            "MaoMao" is supported. Defaults to "Harrington".

    Returns:
        Problem: A copy of the problem with the added desirability functions as scalarization functions.
        list[str]: A list of symbols of the added desirability functions.
    """
    if desirability_func == "Harrington":
        create_func = __create_HDF
    elif desirability_func == "MaoMao":
        create_func = __create_MDF
    else:
        raise ScalarizationError(f"Desirability function {desirability_func} is not supported.")

    if desirability_levels is None:
        desirability_levels = {obj.symbol: (0.9, 0.1) for obj in problem.objectives}

    # check that all objectives have aspiration and reservation levels defined
    for obj in problem.objectives:
        if obj.symbol not in aspiration_levels or obj.symbol not in reservation_levels:
            raise ScalarizationError(
                f"Objective {obj.symbol} does not have both aspiration and reservation levels defined."
            )
    maximize: dict[str, int] = {obj.symbol: -1 if obj.maximize else 1 for obj in problem.objectives}
    symbols = []
    problem_: Problem = problem.model_copy(deep=True)
    for obj in problem.objectives:
        d1, d2 = desirability_levels[obj.symbol]
        func = (
            "- ("
            + create_func(
                obj.symbol + "_min",
                aspiration_levels[obj.symbol] * maximize[obj.symbol],
                reservation_levels[obj.symbol] * maximize[obj.symbol],
                d1,
                d2,
            )
            + ")"
        )
        symbols.append(f"{obj.symbol}_d")
        scalarization = ScalarizationFunction(
            name=f"Desirability function for {obj.symbol}",
            symbol=f"{obj.symbol}_d",
            func=func,
            is_linear=False,
            is_convex=False,
            is_twice_differentiable=obj.is_twice_differentiable,
        )
        problem_ = problem_.add_scalarization(scalarization)

    return problem_, symbols

add_epsilon_constraints

add_epsilon_constraints(
    problem: Problem,
    symbol: str,
    constraint_symbols: dict[str, str],
    objective_symbol: str,
    epsilons: dict[str, float],
) -> tuple[Problem, str, list[str]]

Creates expressions for an epsilon constraints scalarization and constraints.

It is assumed that epsilon have been given in a format where each objective is to be minimized.

The scalarization is defined as follows:

\[\begin{equation} \begin{aligned} & \operatorname{min}_{\mathbf{x} \in S} & & f_t(\mathbf{x}) \\ & \text{s.t.} & & f_j(\mathbf{x}) \leq \epsilon_j \text{ for all } j = 1, \ldots ,k, \; j \neq t, \end{aligned} \end{equation}\]

where \(\epsilon_j\) are the epsilon bounds used in the epsilon constraints \(f_j(\mathbf{x}) \leq \epsilon_j\), and \(k\) is the number of objective functions.

Parameters:

Name Type Description Default
problem Problem

the problem to scalarize.

required
symbol str

the symbol of the added objective function to be optimized.

required
constraint_symbols dict[str, str]

a dict with the symbols to be used with the added constraints. The key indicates the name of the objective function the constraint is related to, and the value is the symbol to be used when defining the constraint.

required
objective_symbol str

the objective used as the objective in the epsilon constraint scalarization.

required
epsilons dict[str, float]

the epsilon constraint values in a dict with each key being an objective's symbol. The corresponding value is then used as the epsilon value for the respective objective function.

required

Raises:

Type Description
ScalarizationError

objective_symbol not found in problem definition.

Returns:

Type Description
tuple[Problem, str, list[str]]

tuple[Problem, str, list[str]]: A triple with the first element being a copy of the problem with the added epsilon constraints. The second element is the symbol of the objective to be optimized. The last element is a list with the symbols of the added constraints to the problem.

Source code in desdeo/tools/scalarization.py
def add_epsilon_constraints(
    problem: Problem, symbol: str, constraint_symbols: dict[str, str], objective_symbol: str, epsilons: dict[str, float]
) -> tuple[Problem, str, list[str]]:
    r"""Creates expressions for an epsilon constraints scalarization and constraints.

    It is assumed that epsilon have been given in a format where each objective is to be minimized.

    The scalarization is defined as follows:

    \begin{equation}
    \begin{aligned}
    & \operatorname{min}_{\mathbf{x} \in S}
    & & f_t(\mathbf{x}) \\
    & \text{s.t.}
    & & f_j(\mathbf{x}) \leq \epsilon_j \text{ for all } j = 1, \ldots ,k, \; j \neq t,
    \end{aligned}
    \end{equation}

    where $\epsilon_j$ are the epsilon bounds used in the epsilon constraints $f_j(\mathbf{x}) \leq \epsilon_j$,
    and $k$ is the number of objective functions.

    Args:
        problem (Problem): the problem to scalarize.
        symbol (str): the symbol of the added objective function to be optimized.
        constraint_symbols (dict[str, str]): a dict with the symbols to be used with the added
            constraints. The key indicates the name of the objective function the constraint
            is related to, and the value is the symbol to be used when defining the constraint.
        objective_symbol (str): the objective used as the objective in the epsilon constraint scalarization.
        epsilons (dict[str, float]): the epsilon constraint values in a dict
            with each key being an objective's symbol. The corresponding value
            is then used as the epsilon value for the respective objective function.

    Raises:
        ScalarizationError: `objective_symbol` not found in problem definition.

    Returns:
        tuple[Problem, str, list[str]]: A triple with the first element being a copy of the
            problem with the added epsilon constraints. The second element is the symbol of
            the objective to be optimized. The last element is a list with the symbols
            of the added constraints to the problem.
    """
    if objective_symbol not in (correct_symbols := [objective.symbol for objective in problem.objectives]):
        msg = f"The given objective symbol {objective_symbol} should be one of {correct_symbols}."
        raise ScalarizationError(msg)

    _problem, _ = add_objective_as_scalarization(problem, symbol, objective_symbol)

    # the epsilons must be given such that each objective function is to be minimized
    constraints = [
        Constraint(
            name=f"Epsilon for {obj.symbol}",
            symbol=constraint_symbols[obj.symbol],
            func=["Add", f"{obj.symbol}_min", ["Negate", epsilons[obj.symbol]]],
            cons_type=ConstraintTypeEnum.LTE,
            is_linear=obj.is_linear,
            is_convex=obj.is_convex,
            is_twice_differentiable=obj.is_twice_differentiable,
        )
        for obj in problem.objectives
        if obj.symbol != objective_symbol
    ]

    _problem = _problem.add_constraints(constraints)

    return _problem, symbol, [con.symbol for con in constraints]

add_guess_sf_diff

add_guess_sf_diff(
    problem: Problem,
    symbol: str,
    reference_point: dict[str, float],
    ideal: dict[str, float] | None = None,
    nadir: dict[str, float] | None = None,
    rho: float = 1e-06,
    delta: float = 1e-06,
) -> tuple[Problem, str]

Adds the differentiable variant of the GUESS scalarizing function.

\[\begin{align*} \min \quad & \alpha + \rho \sum_{i=1}^k \frac{f_i(\mathbf{x})}{z_i^{nad} - \bar{z}_i}, \quad & \\ \text{s.t.} \quad & \frac{f_i(\mathbf{x}) - z_i^{nad}}{z_i^{nad} - \bar{z}_i} - \alpha \leq 0 \quad & \forall i \notin I^{\diamond},\\ & \mathbf{x} \in S, \end{align*}\]

where \(f_{i}\) are objective functions, \(z_{i}^{nad}\) is a component of the nadir point, \(\bar{z}_{i}\) is a component of the reference point, \(\rho\) is a small scalar value, and \(S\) is the feasible solution space of the original problem. The index set \(I^\diamond\) represents objective vectors whose values are free to change. The indices belonging to this set are interpreted as those objective vectors whose components in the reference point is set to be the the respective nadir point component of the problem. Note that in Buchanan (1997), the GUESS method considers all objective functions, i.e. \(I^\diamond\) is an empty set. The functionality to have free-to-change objectives was added in Miettinen & Mäkelä (2006).

References

Buchanan, J. T. (1997). A naive approach for solving MCDM problems: The GUESS method. Journal of the Operational Research Society, 48, 202-206.

Miettinen, K., & Mäkelä, M. M. (2006). Synchronous approach in interactive multiobjective optimization. European Journal of Operational Research, 170(3), 909-922.

Parameters:

Name Type Description Default
problem Problem

the problem the scalarization is added to.

required
symbol str

the symbol given to the added scalarization.

required
reference_point dict[str, float]

a dict with keys corresponding to objective function symbols and values to reference point components, i.e., aspiration levels.

required
ideal dict[str, float]

ideal point values. If not given, attempt will be made to calculate ideal point from problem.

None
nadir dict[str, float]

nadir point values. If not given, attempt will be made to calculate nadir point from problem.

None
rho float

a small scalar value to scale the sum in the objective function of the scalarization. Defaults to 1e-6.

1e-06
delta float

a small scalar value to define the utopian point. Defaults to 1e-6.

1e-06

Returns:

Type Description
tuple[Problem, str]

tuple[Problem, str]: a tuple with the copy of the problem with the added scalarization and the symbol of the added scalarization.

Source code in desdeo/tools/scalarization.py
def add_guess_sf_diff(
    problem: Problem,
    symbol: str,
    reference_point: dict[str, float],
    ideal: dict[str, float] | None = None,
    nadir: dict[str, float] | None = None,
    rho: float = 1e-6,
    delta: float = 1e-6,
) -> tuple[Problem, str]:
    r"""Adds the differentiable variant of the GUESS scalarizing function.

    \begin{align*}
        \min \quad & \alpha + \rho \sum_{i=1}^k \frac{f_i(\mathbf{x})}{z_i^{nad} - \bar{z}_i},
        \quad & \\
        \text{s.t.} \quad & \frac{f_i(\mathbf{x}) - z_i^{nad}}{z_i^{nad} - \bar{z}_i}
         - \alpha \leq 0 \quad & \forall i \notin I^{\diamond},\\
        & \mathbf{x} \in S,
    \end{align*}

    where $f_{i}$ are objective functions, $z_{i}^{nad}$ is a component of the
    nadir point, $\bar{z}_{i}$
    is a component of the reference point, $\rho$ is a small scalar
    value, and $S$ is the feasible solution space of the original problem. The
    index set $I^\diamond$ represents objective vectors whose values are free to
    change. The indices belonging to this set are interpreted as those objective
    vectors whose components in the reference point is set to be the the
    respective nadir point component of the problem. Note that in Buchanan (1997),
    the GUESS method considers all objective functions, i.e. $I^\diamond$ is
    an empty set. The functionality to have free-to-change objectives was added
    in Miettinen & Mäkelä (2006).

    References:
        Buchanan, J. T. (1997). A naive approach for solving MCDM problems: The
        GUESS method. Journal of the Operational Research Society, 48, 202-206.

        Miettinen, K., & Mäkelä, M. M. (2006). Synchronous approach in interactive
        multiobjective optimization. European Journal of Operational Research,
        170(3), 909-922.

    Args:
        problem (Problem): the problem the scalarization is added to.
        symbol (str): the symbol given to the added scalarization.
        reference_point (dict[str, float]): a dict with keys corresponding to objective
            function symbols and values to reference point components, i.e.,
            aspiration levels.
        ideal (dict[str, float], optional): ideal point values. If not given, attempt will be made
            to calculate ideal point from problem.
        nadir (dict[str, float], optional): nadir point values. If not given, attempt will be made
            to calculate nadir point from problem.
        rho (float, optional): a small scalar value to scale the sum in the objective
            function of the scalarization. Defaults to 1e-6.
        delta (float, optional): a small scalar value to define the utopian point. Defaults to 1e-6.

    Returns:
        tuple[Problem, str]: a tuple with the copy of the problem with the added
            scalarization and the symbol of the added scalarization.
    """
    # check reference point
    if not objective_dict_has_all_symbols(problem, reference_point):
        msg = f"The give reference point {reference_point} is missing value for one or more objectives."
        raise ScalarizationError(msg)

    # check if ideal point is specified
    # if not specified, try to calculate corrected ideal point
    if ideal is not None:
        ideal_point = ideal
    elif problem.get_ideal_point() is not None:
        ideal_point = get_corrected_ideal(problem)
    else:
        msg = "Ideal point not defined!"
        raise ScalarizationError(msg)

    # check if nadir point is specified
    # if not specified, try to calculate corrected nadir point
    if nadir is not None:
        nadir_point = nadir
    elif problem.get_nadir_point() is not None:
        nadir_point = get_corrected_nadir(problem)
    else:
        msg = "Nadir point not defined!"
        raise ScalarizationError(msg)

    corrected_rp = flip_maximized_objective_values(problem, reference_point)

    # the indices that are free to change, set if component of reference point
    # has the corresponding nadir value, or if it is greater than the nadir value
    free_to_change = [
        sym
        for sym in corrected_rp
        if np.isclose(corrected_rp[sym], nadir_point[sym]) or corrected_rp[sym] > nadir_point[sym]
    ]

    # define the auxiliary variable
    alpha = Variable(
        name="alpha",
        symbol="_alpha",
        variable_type=VariableTypeEnum.real,
        lowerbound=-float("Inf"),
        upperbound=float("Inf"),
        initial_value=1.0,
    )

    # define the objective function of the scalarization
    aug_expr = " + ".join(
        [
            (  # Technically delta should be included (according to the paper), but I'm a rebel and don't want to add it
                f"{obj.symbol}_min / ({nadir_point[obj.symbol]} - {ideal_point[obj.symbol]})"
                if obj.symbol in free_to_change
                else f"{obj.symbol}_min / ({nadir_point[obj.symbol]} - {corrected_rp[obj.symbol]})"
            )
            for obj in problem.objectives
        ]
    )

    target_expr = f"_alpha + {rho}*" + f"({aug_expr})"
    scalarization = ScalarizationFunction(
        name="GUESS scalarization objective function",
        symbol=symbol,
        func=target_expr,
        is_convex=problem.is_convex,
        is_linear=problem.is_linear,
        is_twice_differentiable=problem.is_twice_differentiable,
    )

    constraints = []

    for obj in problem.objectives:
        if obj.symbol in free_to_change:
            # if free to change, then do not add a constraint
            continue

        # not free to change, add constraint
        expr = (
            f"({obj.symbol}_min - {nadir_point[obj.symbol]}) / "
            f"({nadir_point[obj.symbol]} - {corrected_rp[obj.symbol]}) - _alpha"
        )

        constraints.append(
            Constraint(
                name=f"Constraint for {obj.symbol}",
                symbol=f"{obj.symbol}_con",
                func=expr,
                cons_type=ConstraintTypeEnum.LTE,
                is_linear=obj.is_linear,
                is_convex=obj.is_convex,
                is_twice_differentiable=obj.is_twice_differentiable,
            )
        )

    _problem = problem.add_variables([alpha])
    _problem = _problem.add_scalarization(scalarization)
    return _problem.add_constraints(constraints), symbol

add_guess_sf_nondiff

add_guess_sf_nondiff(
    problem: Problem,
    symbol: str,
    reference_point: dict[str, float],
    ideal: dict[str, float] | None = None,
    nadir: dict[str, float] | None = None,
    rho: float = 1e-06,
) -> tuple[Problem, str]

Adds the non-differentiable variant of the GUESS scalarizing function.

\[\begin{align*} \underset{\mathbf{x}}{\min}\quad & \underset{i \notin I^\diamond}{\max} \left[ \frac{f_i(\mathbf{x}) - z_i^{nad}}{z_i^{nad} - \bar{z}_i} \right] + \rho \sum_{i=1}^k \frac{f_i(\mathbf{x})}{z_i^{nad} - \bar{z}_i}, \quad & \\ \text{s.t.}\quad & \mathbf{x} \in S, \end{align*}\]

where \(f_{i}\) are objective functions, \(z_{i}^{nad}\) is a component of the nadir point, \(\bar{z}_{i}\) is a component of the reference point, \(\rho\) is a small scalar value, and \(S\) is the feasible solution space of the original problem. The index set \(I^\diamond\) represents objective vectors whose values are free to change. The indices belonging to this set are interpreted as those objective vectors whose components in the reference point is set to be the the respective nadir point component of the problem. Note that in Buchanan (1997), the GUESS method considers all objective functions, i.e. \(I^\diamond\) is an empty set. The functionality to have free-to-change objectives was added in Miettinen & Mäkelä (2006).

References

Buchanan, J. T. (1997). A naive approach for solving MCDM problems: The GUESS method. Journal of the Operational Research Society, 48, 202-206.

Miettinen, K., & Mäkelä, M. M. (2006). Synchronous approach in interactive multiobjective optimization. European Journal of Operational Research, 170(3), 909-922.

Parameters:

Name Type Description Default
problem Problem

the problem the scalarization is added to.

required
symbol str

the symbol given to the added scalarization.

required
reference_point dict[str, float]

a dict with keys corresponding to objective function symbols and values to reference point components, i.e., aspiration levels.

required
ideal dict[str, float]

ideal point values. If not given, attempt will be made to calculate ideal point from problem.

None
nadir dict[str, float]

nadir point values. If not given, attempt will be made to calculate nadir point from problem.

None
rho float

a small scalar value to scale the sum in the objective function of the scalarization. Defaults to 1e-6.

1e-06

Returns:

Type Description
tuple[Problem, str]

tuple[Problem, str]: a tuple with the copy of the problem with the added scalarization and the symbol of the added scalarization.

Source code in desdeo/tools/scalarization.py
def add_guess_sf_nondiff(
    problem: Problem,
    symbol: str,
    reference_point: dict[str, float],
    ideal: dict[str, float] | None = None,
    nadir: dict[str, float] | None = None,
    rho: float = 1e-6,
) -> tuple[Problem, str]:
    r"""Adds the non-differentiable variant of the GUESS scalarizing function.

    \begin{align*}
        \underset{\mathbf{x}}{\min}\quad & \underset{i \notin I^\diamond}{\max}
        \left[
        \frac{f_i(\mathbf{x}) - z_i^{nad}}{z_i^{nad} - \bar{z}_i}
        \right]
        + \rho \sum_{i=1}^k \frac{f_i(\mathbf{x})}{z_i^{nad} - \bar{z}_i},
        \quad & \\
        \text{s.t.}\quad
        & \mathbf{x} \in S,
    \end{align*}

    where $f_{i}$ are objective functions, $z_{i}^{nad}$ is a component of the
    nadir point, $\bar{z}_{i}$
    is a component of the reference point, $\rho$ is a small scalar
    value, and $S$ is the feasible solution space of the original problem. The
    index set $I^\diamond$ represents objective vectors whose values are free to
    change. The indices belonging to this set are interpreted as those objective
    vectors whose components in the reference point is set to be the the
    respective nadir point component of the problem. Note that in Buchanan (1997),
    the GUESS method considers all objective functions, i.e. $I^\diamond$ is
    an empty set. The functionality to have free-to-change objectives was added
    in Miettinen & Mäkelä (2006).

    References:
        Buchanan, J. T. (1997). A naive approach for solving MCDM problems: The
        GUESS method. Journal of the Operational Research Society, 48, 202-206.

        Miettinen, K., & Mäkelä, M. M. (2006). Synchronous approach in interactive
        multiobjective optimization. European Journal of Operational Research,
        170(3), 909-922.

    Args:
        problem (Problem): the problem the scalarization is added to.
        symbol (str): the symbol given to the added scalarization.
        reference_point (dict[str, float]): a dict with keys corresponding to objective
            function symbols and values to reference point components, i.e.,
            aspiration levels.
        ideal (dict[str, float], optional): ideal point values. If not given, attempt will be made
            to calculate ideal point from problem.
        nadir (dict[str, float], optional): nadir point values. If not given, attempt will be made
            to calculate nadir point from problem.
        rho (float, optional): a small scalar value to scale the sum in the objective
            function of the scalarization. Defaults to 1e-6.

    Returns:
        tuple[Problem, str]: a tuple with the copy of the problem with the added
            scalarization and the symbol of the added scalarization.
    """
    # check reference point
    if not objective_dict_has_all_symbols(problem, reference_point):
        msg = f"The give reference point {reference_point} is missing value for one or more objectives."
        raise ScalarizationError(msg)

    # check if ideal point is specified
    # if not specified, try to calculate corrected ideal point
    if ideal is not None:
        ideal_point = ideal
    elif problem.get_ideal_point() is not None:
        ideal_point = get_corrected_ideal(problem)
    else:
        msg = "Ideal point not defined!"
        raise ScalarizationError(msg)

    # check if nadir point is specified
    # if not specified, try to calculate corrected nadir point
    if nadir is not None:
        nadir_point = nadir
    elif problem.get_nadir_point() is not None:
        nadir_point = get_corrected_nadir(problem)
    else:
        msg = "Nadir point not defined!"
        raise ScalarizationError(msg)

    corrected_rp = flip_maximized_objective_values(problem, reference_point)

    # the indices that are free to change, set if component of reference point
    # has the corresponding nadir value, or if it is greater than the nadir value
    free_to_change = [
        sym
        for sym in corrected_rp
        if np.isclose(corrected_rp[sym], nadir_point[sym]) or corrected_rp[sym] > nadir_point[sym]
    ]

    # define the max expression of the scalarization
    # if the objective symbol belongs to the class I^diamond, then do not add it
    # to the max expression
    max_expr = ", ".join(
        [
            (
                f"({obj.symbol}_min - {(nadir_point[obj.symbol])}) / "
                f"({nadir_point[obj.symbol]} - {(corrected_rp[obj.symbol])})"
            )
            for obj in problem.objectives
            if obj.symbol not in free_to_change
        ]
    )

    # define the augmentation term
    aug_expr = " + ".join(
        [
            (  # Technically delta should be included (according to the paper), but I'm a rebel and don't want to add it
                f"{obj.symbol}_min / ({nadir_point[obj.symbol]} - {ideal_point[obj.symbol]})"
                if obj.symbol in free_to_change
                else f"{obj.symbol}_min / ({nadir_point[obj.symbol]} - {corrected_rp[obj.symbol]})"
            )
            for obj in problem.objectives
        ]
    )

    target_expr = f"{Op.MAX}({max_expr}) + {rho}*({aug_expr})"
    scalarization = ScalarizationFunction(
        name="GUESS scalarization objective function",
        symbol=symbol,
        func=target_expr,
        is_linear=False,
        is_convex=False,
        is_twice_differentiable=False,
    )

    return problem.add_scalarization(scalarization), symbol

add_nimbus_sf_diff

add_nimbus_sf_diff(
    problem: Problem,
    symbol: str,
    classifications: dict[str, tuple[str, float | None]],
    current_objective_vector: dict[str, float],
    ideal: dict[str, float] | None = None,
    nadir: dict[str, float] | None = None,
    delta: float = 1e-06,
    rho: float = 1e-06,
) -> Problem

Implements the differentiable variant of the NIMBUS scalarization function.

\[\begin{align*} \min \quad & \alpha + \rho \sum_{i =1}^k \frac{f_i(\mathbf{x})}{z_i^{nad} - z_i^{\star\star}} \\ \text{s.t.} \quad & \frac{f_i(\mathbf{x}) - z_i^*}{z_i^{nad} - z_i^{\star\star}} - \alpha \leq 0 \quad & \forall i \in I^< \\ & \frac{f_i(\mathbf{x}) - \hat{z}_i}{z_i^{nad} - z_i^{\star\star}} - \alpha \leq 0 \quad & \forall i \in I^\leq \\ & f_i(\mathbf{x}) - f_i(\mathbf{x_c}) \leq 0 \quad & \forall i \in I^< \cup I^\leq \cup I^= \\ & f_i(\mathbf{x}) - \epsilon_i \leq 0 \quad & \forall i \in I^\geq \\ & \mathbf{x} \in S, \end{align*}\]

where \(f_i\) are objective functions, \(f_i(\mathbf{x_c})\) is a component of the current objective function, \(\hat{z}_i\) is an aspiration level, \(\varepsilon_i\) is a reservation level, \(z_i^\star\) is a component of the ideal point, \(z_i^{\star\star} = z_i^\star - \delta\) is a component of the utopian point, \(z_i^\text{nad}\) is a component of the nadir point, \(\rho\) is a small scalar, \(S\) is the feasible solution space of the problem (i.e., it means the other constraints of the problem being solved should be accounted for as well), and \(\alpha\) is an auxiliary variable.

The \(I\)-sets are related to the classifications given to each objective function value in respect to the current objective vector (e.g., by a decision maker). They are as follows:

  • \(I^{<}\): values that should improve,
  • \(I^{\leq}\): values that should improve until a given aspiration level \(\hat{z}_i\),
  • \(I^{=}\): values that are fine as they are,
  • \(I^{\geq}\): values that can be impaired until some reservation level \(\varepsilon_i\), and
  • \(I^{\diamond}\): values that are allowed to change freely (not present explicitly in this scalarization function).

The aspiration levels and the reservation levels are supplied for each classification, when relevant, in the argument classifications as follows:

classifications = {
    "f_1": ("<", None),
    "f_2": ("<=", 42.1),
    "f_3": (">=", 22.2),
    "f_4": ("0", None)
    }

Here, we have assumed four objective functions. The key of the dict is a function's symbol, and the tuple consists of a pair where the left element is the classification (self explanatory, '0' is for objective values that may change freely), the right element is either None or an aspiration or a reservation level depending on the classification.

References

Miettinen, K., & Mäkelä, M. M. (2002). On scalarizing functions in multiobjective optimization. OR Spectrum, 24(2), 193-213.

Parameters:

Name Type Description Default
problem Problem

the problem to be scalarized.

required
symbol str

the symbol given to the scalarization function, i.e., target of the optimization.

required
classifications dict[str, tuple[str, float | None]]

a dict, where the key is a symbol of an objective function, and the value is a tuple with a classification and an aspiration or a reservation level, or None, depending on the classification. See above for an explanation.

required
current_objective_vector dict[str, float]

the current objective vector that corresponds to a Pareto optimal solution. The classifications are assumed to been given in respect to this vector.

required
ideal dict[str, float]

ideal point values. If not given, attempt will be made to calculate ideal point from problem.

None
nadir dict[str, float]

nadir point values. If not given, attempt will be made to calculate nadir point from problem.

None
delta float

a small scalar used to define the utopian point. Defaults to 0.000001.

1e-06
rho float

a small scalar used in the augmentation term. Defaults to 0.000001.

1e-06

Returns:

Type Description
Problem

tuple[Problem, str]: a tuple with a copy of the problem with the added scalarizations and the symbol of the scalarization.

Source code in desdeo/tools/scalarization.py
def add_nimbus_sf_diff(
    problem: Problem,
    symbol: str,
    classifications: dict[str, tuple[str, float | None]],
    current_objective_vector: dict[str, float],
    ideal: dict[str, float] | None = None,
    nadir: dict[str, float] | None = None,
    delta: float = 0.000001,
    rho: float = 0.000001,
) -> Problem:
    r"""Implements the differentiable variant of the NIMBUS scalarization function.

    \begin{align*}
        \min \quad & \alpha + \rho \sum_{i =1}^k \frac{f_i(\mathbf{x})}{z_i^{nad} - z_i^{\star\star}} \\
        \text{s.t.} \quad & \frac{f_i(\mathbf{x}) - z_i^*}{z_i^{nad} - z_i^{\star\star}} -
            \alpha \leq 0 \quad & \forall i \in I^< \\
        & \frac{f_i(\mathbf{x}) - \hat{z}_i}{z_i^{nad} - z_i^{\star\star}} - \alpha \leq 0 \quad &
        \forall i \in I^\leq \\
        & f_i(\mathbf{x}) - f_i(\mathbf{x_c}) \leq 0 \quad & \forall i \in I^< \cup I^\leq \cup I^= \\
        & f_i(\mathbf{x}) - \epsilon_i \leq 0 \quad & \forall i \in I^\geq \\
        & \mathbf{x} \in S,
    \end{align*}

    where $f_i$ are objective functions, $f_i(\mathbf{x_c})$ is a component of
    the current objective function, $\hat{z}_i$ is an aspiration level,
    $\varepsilon_i$ is a reservation level, $z_i^\star$ is a component of the
    ideal point, $z_i^{\star\star} = z_i^\star - \delta$ is a component of the
    utopian point, $z_i^\text{nad}$ is a component of the nadir point, $\rho$ is
    a small scalar, $S$ is the feasible solution space of the problem (i.e., it
    means the other constraints of the problem being solved should be accounted
    for as well), and $\alpha$ is an auxiliary variable.

    The $I$-sets are related to the classifications given to each objective function value
    in respect to  the current objective vector (e.g., by a decision maker). They
    are as follows:

    - $I^{<}$: values that should improve,
    - $I^{\leq}$: values that should improve until a given aspiration level $\hat{z}_i$,
    - $I^{=}$: values that are fine as they are,
    - $I^{\geq}$: values that can be impaired until some reservation level $\varepsilon_i$, and
    - $I^{\diamond}$: values that are allowed to change freely (not present explicitly in this scalarization function).

    The aspiration levels and the reservation levels are supplied for each classification, when relevant, in
    the argument `classifications` as follows:

    ```python
    classifications = {
        "f_1": ("<", None),
        "f_2": ("<=", 42.1),
        "f_3": (">=", 22.2),
        "f_4": ("0", None)
        }
    ```

    Here, we have assumed four objective functions. The key of the dict is a function's symbol, and the tuple
    consists of a pair where the left element is the classification (self explanatory, '0' is for objective values
    that may change freely), the right element is either `None` or an aspiration or a reservation level
    depending on the classification.

    References:
        Miettinen, K., & Mäkelä, M. M. (2002). On scalarizing functions in
            multiobjective optimization. OR Spectrum, 24(2), 193-213.


    Args:
        problem (Problem): the problem to be scalarized.
        symbol (str): the symbol given to the scalarization function, i.e., target of the optimization.
        classifications (dict[str, tuple[str, float  |  None]]): a dict, where the key is a symbol
            of an objective function, and the value is a tuple with a classification and an aspiration
            or a reservation level, or `None`, depending on the classification. See above for an
            explanation.
        current_objective_vector (dict[str, float]): the current objective vector that corresponds to
            a Pareto optimal solution. The classifications are assumed to been given in respect to
            this vector.
        ideal (dict[str, float], optional): ideal point values. If not given, attempt will be made
            to calculate ideal point from problem.
        nadir (dict[str, float], optional): nadir point values. If not given, attempt will be made
            to calculate nadir point from problem.
        delta (float, optional): a small scalar used to define the utopian point. Defaults to 0.000001.
        rho (float, optional): a small scalar used in the augmentation term. Defaults to 0.000001.

    Returns:
        tuple[Problem, str]: a tuple with a copy of the problem with the added scalarizations and the
            symbol of the scalarization.
    """
    # check that classifications have been provided for all objective functions
    if not objective_dict_has_all_symbols(problem, classifications):
        msg = (
            f"The given classifications {classifications} do not define "
            "a classification for all the objective functions."
        )
        raise ScalarizationError(msg)

    # check that at least one objective function is allowed to be improved and one is
    # allowed to worsen
    if not any(classifications[obj.symbol][0] in ["<", "<="] for obj in problem.objectives) or not any(
        classifications[obj.symbol][0] in [">=", "0"] for obj in problem.objectives
    ):
        msg = (
            f"The given classifications {classifications} should allow at least one objective function value "
            "to improve and one to worsen."
        )
        raise ScalarizationError(msg)

    # check if ideal point is specified
    # if not specified, try to calculate corrected ideal point
    if ideal is not None:
        ideal_point = ideal
    elif problem.get_ideal_point() is not None:
        ideal_point = get_corrected_ideal(problem)
    else:
        msg = "Ideal point not defined!"
        raise ScalarizationError(msg)

    # check if nadir point is specified
    # if not specified, try to calculate corrected nadir point
    if nadir is not None:
        nadir_point = nadir
    elif problem.get_nadir_point() is not None:
        nadir_point = get_corrected_nadir(problem)
    else:
        msg = "Nadir point not defined!"
        raise ScalarizationError(msg)

    # define the auxiliary variable
    alpha = Variable(
        name="alpha",
        symbol="_alpha",
        variable_type=VariableTypeEnum.real,
        lowerbound=-float("Inf"),
        upperbound=float("Inf"),
        initial_value=1.0,
    )

    # define the objective function of the scalarization
    aug_expr = " + ".join(
        [
            f"{obj.symbol}_min / ({nadir_point[obj.symbol]} - {ideal_point[obj.symbol] - delta})"
            for obj in problem.objectives
        ]
    )

    target_expr = f"_alpha + {rho}*" + f"({aug_expr})"
    scalarization = ScalarizationFunction(
        name="NIMBUS scalarization objective function",
        symbol=symbol,
        func=target_expr,
        is_linear=problem.is_linear,
        is_convex=problem.is_convex,
        is_twice_differentiable=problem.is_twice_differentiable,
    )

    constraints = []

    # create all the constraints
    for obj in problem.objectives:
        _symbol = obj.symbol
        match classifications[_symbol]:
            case ("<", _):
                expr = (
                    f"({_symbol}_min - {ideal_point[_symbol]}) / "
                    f"({nadir_point[_symbol] - (ideal_point[_symbol] - delta)}) - _alpha"
                )
                constraints.append(
                    Constraint(
                        name=f"improvement constraint for {_symbol}",
                        symbol=f"{_symbol}_lt",
                        func=expr,
                        cons_type=ConstraintTypeEnum.LTE,
                        is_linear=problem.is_linear,
                        is_convex=problem.is_convex,
                        is_twice_differentiable=problem.is_twice_differentiable,
                    )
                )

                # if obj is to be maximized, then the current objective vector value needs to be multiplied by -1
                expr = f"{_symbol}_min - {current_objective_vector[_symbol]}{' * -1' if obj.maximize else ''}"
                constraints.append(
                    Constraint(
                        name=f"stay at least equal constraint for {_symbol}",
                        symbol=f"{_symbol}_eq",
                        func=expr,
                        cons_type=ConstraintTypeEnum.LTE,
                        is_linear=problem.is_linear,
                        is_convex=problem.is_convex,
                        is_twice_differentiable=problem.is_twice_differentiable,
                    )
                )
            case ("<=", aspiration):
                # if obj is to be maximized, then the current reservation value needs to be multiplied by -1
                expr = (
                    f"({_symbol}_min - {aspiration}{' * -1' if obj.maximize else ''}) / "
                    f"({nadir_point[_symbol]} - {ideal_point[_symbol] - delta}) - _alpha"
                )
                constraints.append(
                    Constraint(
                        name=f"improvement until constraint for {_symbol}",
                        symbol=f"{_symbol}_lte",
                        func=expr,
                        cons_type=ConstraintTypeEnum.LTE,
                        is_linear=problem.is_linear,
                        is_convex=problem.is_convex,
                        is_twice_differentiable=problem.is_twice_differentiable,
                    )
                )

                # if obj is to be maximized, then the current objective vector value needs to be multiplied by -1
                expr = f"{_symbol}_min - {current_objective_vector[_symbol]}{' * -1' if obj.maximize else ''}"
                constraints.append(
                    Constraint(
                        name=f"stay at least equal constraint for {_symbol}",
                        symbol=f"{_symbol}_eq",
                        func=expr,
                        cons_type=ConstraintTypeEnum.LTE,
                        is_linear=problem.is_linear,
                        is_convex=problem.is_convex,
                        is_twice_differentiable=problem.is_twice_differentiable,
                    )
                )
            case ("=", _):
                # if obj is to be maximized, then the current objective vector value needs to be multiplied by -1
                expr = f"{_symbol}_min - {current_objective_vector[_symbol]}{' * -1' if obj.maximize else ''}"
                constraints.append(
                    Constraint(
                        name=f"stay at least equal constraint for {_symbol}",
                        symbol=f"{_symbol}_eq",
                        func=expr,
                        cons_type=ConstraintTypeEnum.LTE,
                        is_linear=problem.is_linear,
                        is_convex=problem.is_convex,
                        is_twice_differentiable=problem.is_twice_differentiable,
                    )
                )
            case (">=", reservation):
                # if obj is to be maximized, then the reservation value needs to be multiplied by -1
                expr = f"{_symbol}_min - {reservation}{' * -1' if obj.maximize else ''}"
                constraints.append(
                    Constraint(
                        name=f"worsen until constriant for {_symbol}",
                        symbol=f"{_symbol}_gte",
                        func=expr,
                        cons_type=ConstraintTypeEnum.LTE,
                        is_linear=problem.is_linear,
                        is_convex=problem.is_convex,
                        is_twice_differentiable=problem.is_twice_differentiable,
                    )
                )
            case ("0", _):
                # not relevant for this scalarization
                pass
            case (c, _):
                msg = (
                    f"Warning! The classification {c} was supplied, but it is not supported."
                    "Must be one of ['<', '<=', '0', '=', '>=']"
                )

    # add the auxiliary variable, scalarization, and constraints
    _problem = problem.add_variables([alpha])
    _problem = _problem.add_scalarization(scalarization)
    return _problem.add_constraints(constraints), symbol

add_nimbus_sf_nondiff

add_nimbus_sf_nondiff(
    problem: Problem,
    symbol: str,
    classifications: dict[str, tuple[str, float | None]],
    current_objective_vector: dict[str, float],
    ideal: dict[str, float] | None = None,
    nadir: dict[str, float] | None = None,
    delta: float = 1e-06,
    rho: float = 1e-06,
) -> Problem

Implements the non-differentiable variant of the NIMBUS scalarization function.

\[\begin{align*} \underset{\mathbf{x}}{\min} \underset{\substack{j \in I^\leq \\i \in I^<}}{\max} &\left[ \frac{f_i(\mathbf{x}) - z_i^\star}{z_i^\text{nad} - z_i^{\star\star}}, \frac{f_j(\mathbf{x}) - \hat{z}_j}{z_j^\text{nad} - x_j^{\star\star}} \right] +\rho \sum_{i =1}^k \frac{f_i(\mathbf{x})}{z_i^{nad} - z_i^{\star\star}} \\ \text{s.t.} \quad & f_i(\mathbf{x}) - f_i(\mathbf{x}^c) \leq 0\quad&\forall i \in I^< \cup I^\leq \cup I^=,\\ & f_i(\mathbf{x}) - \epsilon_i \leq 0\quad&\forall i \in I^\geq,\\ & \mathbf{x} \in S, \end{align*}\]

where \(f_i\) are objective functions, \(f_i(\mathbf{x_c})\) is a component of the current objective function, \(\hat{z}_i\) is an aspiration level, \(\varepsilon_i\) is a reservation level, \(z_i^\star\) is a component of the ideal point, \(z_i^{\star\star} = z_i^\star - \delta\) is a component of the utopian point, \(z_i^\text{nad}\) is a component of the nadir point, \(\rho\) is a small scalar, and \(S\) is the feasible solution space of the problem (i.e., it means the other constraints of the problem being solved should be accounted for as well).

The \(I\)-sets are related to the classifications given to each objective function value in respect to the current objective vector (e.g., by a decision maker). They are as follows:

  • \(I^{<}\): values that should improve,
  • \(I^{\leq}\): values that should improve until a given aspiration level \(\hat{z}_i\),
  • \(I^{=}\): values that are fine as they are,
  • \(I^{\geq}\): values that can be impaired until some reservation level \(\varepsilon_i\), and
  • \(I^{\diamond}\): values that are allowed to change freely (not present explicitly in this scalarization function).

The aspiration levels and the reservation levels are supplied for each classification, when relevant, in the argument classifications as follows:

classifications = {
    "f_1": ("<", None),
    "f_2": ("<=", 42.1),
    "f_3": (">=", 22.2),
    "f_4": ("0", None)
    }

Here, we have assumed four objective functions. The key of the dict is a function's symbol, and the tuple consists of a pair where the left element is the classification (self explanatory, '0' is for objective values that may change freely), the right element is either None or an aspiration or a reservation level depending on the classification.

References

Miettinen, K., & Mäkelä, M. M. (2002). On scalarizing functions in multiobjective optimization. OR Spectrum, 24(2), 193-213.

Parameters:

Name Type Description Default
problem Problem

the problem to be scalarized.

required
symbol str

the symbol given to the scalarization function, i.e., target of the optimization.

required
classifications dict[str, tuple[str, float | None]]

a dict, where the key is a symbol of an objective function, and the value is a tuple with a classification and an aspiration or a reservation level, or None, depending on the classification. See above for an explanation.

required
current_objective_vector dict[str, float]

the current objective vector that corresponds to a Pareto optimal solution. The classifications are assumed to been given in respect to this vector.

required
ideal dict[str, float]

optional ideal point values. If not given, attempt will be made to calculate ideal point from problem.

None
nadir dict[str, float]

optional nadir point values. If not given, attempt will be made to calculate nadir point from problem.

None
delta float

a small scalar used to define the utopian point. Defaults to 0.000001.

1e-06
rho float

a small scalar used in the augmentation term. Defaults to 0.000001.

1e-06

Returns:

Type Description
Problem

tuple[Problem, str]: a tuple with a copy of the problem with the added scalarizations and the symbol of the scalarization.

Source code in desdeo/tools/scalarization.py
def add_nimbus_sf_nondiff(
    problem: Problem,
    symbol: str,
    classifications: dict[str, tuple[str, float | None]],
    current_objective_vector: dict[str, float],
    ideal: dict[str, float] | None = None,
    nadir: dict[str, float] | None = None,
    delta: float = 0.000001,
    rho: float = 0.000001,
) -> Problem:
    r"""Implements the non-differentiable variant of the NIMBUS scalarization function.

    \begin{align*}
        \underset{\mathbf{x}}{\min}
        \underset{\substack{j \in I^\leq \\i \in I^<}}{\max}
        &\left[ \frac{f_i(\mathbf{x}) - z_i^\star}{z_i^\text{nad} - z_i^{\star\star}},
        \frac{f_j(\mathbf{x}) - \hat{z}_j}{z_j^\text{nad} - x_j^{\star\star}} \right]
        +\rho \sum_{i =1}^k \frac{f_i(\mathbf{x})}{z_i^{nad} - z_i^{\star\star}} \\
        \text{s.t.} \quad & f_i(\mathbf{x}) - f_i(\mathbf{x}^c) \leq 0\quad&\forall i \in I^< \cup I^\leq \cup I^=,\\
        & f_i(\mathbf{x}) - \epsilon_i \leq 0\quad&\forall i \in I^\geq,\\
        & \mathbf{x} \in S,
    \end{align*}

    where $f_i$ are objective functions, $f_i(\mathbf{x_c})$ is a component of
    the current objective function, $\hat{z}_i$ is an aspiration level,
    $\varepsilon_i$ is a reservation level, $z_i^\star$ is a component of the
    ideal point, $z_i^{\star\star} = z_i^\star - \delta$ is a component of the
    utopian point, $z_i^\text{nad}$ is a component of the nadir point, $\rho$ is
    a small scalar, and $S$ is the feasible solution space of the problem (i.e., it
    means the other constraints of the problem being solved should be accounted
    for as well).

    The $I$-sets are related to the classifications given to each objective function value
    in respect to  the current objective vector (e.g., by a decision maker). They
    are as follows:

    - $I^{<}$: values that should improve,
    - $I^{\leq}$: values that should improve until a given aspiration level $\hat{z}_i$,
    - $I^{=}$: values that are fine as they are,
    - $I^{\geq}$: values that can be impaired until some reservation level $\varepsilon_i$, and
    - $I^{\diamond}$: values that are allowed to change freely (not present explicitly in this scalarization function).

    The aspiration levels and the reservation levels are supplied for each classification, when relevant, in
    the argument `classifications` as follows:

    ```python
    classifications = {
        "f_1": ("<", None),
        "f_2": ("<=", 42.1),
        "f_3": (">=", 22.2),
        "f_4": ("0", None)
        }
    ```

    Here, we have assumed four objective functions. The key of the dict is a function's symbol, and the tuple
    consists of a pair where the left element is the classification (self explanatory, '0' is for objective values
    that may change freely), the right element is either `None` or an aspiration or a reservation level
    depending on the classification.

    References:
        Miettinen, K., & Mäkelä, M. M. (2002). On scalarizing functions in
            multiobjective optimization. OR Spectrum, 24(2), 193-213.


    Args:
        problem (Problem): the problem to be scalarized.
        symbol (str): the symbol given to the scalarization function, i.e., target of the optimization.
        classifications (dict[str, tuple[str, float  |  None]]): a dict, where the key is a symbol
            of an objective function, and the value is a tuple with a classification and an aspiration
            or a reservation level, or `None`, depending on the classification. See above for an
            explanation.
        current_objective_vector (dict[str, float]): the current objective vector that corresponds to
            a Pareto optimal solution. The classifications are assumed to been given in respect to
            this vector.
        ideal (dict[str, float], optional): optional ideal point values. If not given, attempt will be made
            to calculate ideal point from problem.
        nadir (dict[str, float], optional): optional nadir point values. If not given, attempt will be made
            to calculate nadir point from problem.
        delta (float, optional): a small scalar used to define the utopian point. Defaults to 0.000001.
        rho (float, optional): a small scalar used in the augmentation term. Defaults to 0.000001.

    Returns:
        tuple[Problem, str]: a tuple with a copy of the problem with the added scalarizations and the
            symbol of the scalarization.
    """
    # check that classifications have been provided for all objective functions
    if not objective_dict_has_all_symbols(problem, classifications):
        msg = (
            f"The given classifications {classifications} do not define "
            "a classification for all the objective functions."
        )
        raise ScalarizationError(msg)

    # check that at least one objective function is allowed to be improved and one is
    # allowed to worsen
    if not any(classifications[obj.symbol][0] in ["<", "<="] for obj in problem.objectives) or not any(
        classifications[obj.symbol][0] in [">=", "0"] for obj in problem.objectives
    ):
        msg = (
            f"The given classifications {classifications} should allow at least one objective function value "
            "to improve and one to worsen."
        )
        raise ScalarizationError(msg)

    # check if ideal point is specified
    # if not specified, try to calculate corrected ideal point
    if ideal is not None:
        ideal_point = ideal
    elif problem.get_ideal_point() is not None:
        ideal_point = get_corrected_ideal(problem)
    else:
        msg = "Ideal point not defined!"
        raise ScalarizationError(msg)

    # check if nadir point is specified
    # if not specified, try to calculate corrected nadir point
    if nadir is not None:
        nadir_point = nadir
    elif problem.get_nadir_point() is not None:
        nadir_point = get_corrected_nadir(problem)
    else:
        msg = "Nadir point not defined!"
        raise ScalarizationError(msg)

    corrected_current_point = flip_maximized_objective_values(problem, current_objective_vector)

    # max term and constraints
    max_args = []
    constraints = []

    for obj in problem.objectives:
        _symbol = obj.symbol
        match classifications[_symbol]:
            case ("<", _):
                max_expr = (
                    f"({_symbol}_min - {ideal_point[_symbol]}) / "
                    f"({nadir_point[_symbol]} - {ideal_point[_symbol] - delta})"
                )
                max_args.append(max_expr)

                con_expr = f"{_symbol}_min - {corrected_current_point[_symbol]}"
                constraints.append(
                    Constraint(
                        name=f"improvement constraint for {_symbol}",
                        symbol=f"{_symbol}_lt",
                        func=con_expr,
                        cons_type=ConstraintTypeEnum.LTE,
                        is_linear=problem.is_linear,
                        is_convex=problem.is_convex,
                        is_twice_differentiable=problem.is_twice_differentiable,
                    )
                )

            case ("<=", aspiration):
                # if obj is to be maximized, then the current reservation value needs to be multiplied by -1
                max_expr = (
                    f"({_symbol}_min - {aspiration * -1 if obj.maximize else aspiration}) / "
                    f"({nadir_point[_symbol]} - {ideal_point[_symbol] - delta})"
                )
                max_args.append(max_expr)

                con_expr = f"{_symbol}_min - {corrected_current_point[_symbol]}"
                constraints.append(
                    Constraint(
                        name=f"improvement until constraint for {_symbol}",
                        symbol=f"{_symbol}_lte",
                        func=con_expr,
                        cons_type=ConstraintTypeEnum.LTE,
                        is_linear=problem.is_linear,
                        is_convex=problem.is_convex,
                        is_twice_differentiable=problem.is_twice_differentiable,
                    )
                )

            case ("=", _):
                con_expr = f"{_symbol}_min - {corrected_current_point[_symbol]}"
                constraints.append(
                    Constraint(
                        name=f"Stay at least as good constraint for {_symbol}",
                        symbol=f"{_symbol}_eq",
                        func=con_expr,
                        cons_type=ConstraintTypeEnum.LTE,
                        is_linear=problem.is_linear,
                        is_convex=problem.is_convex,
                        is_twice_differentiable=problem.is_twice_differentiable,
                    )
                )
            case (">=", reservation):
                con_expr = f"{_symbol}_min - {-1 * reservation if obj.maximize else reservation}"
                constraints.append(
                    Constraint(
                        name=f"Worsen until constraint for {_symbol}",
                        symbol=f"{_symbol}_gte",
                        func=con_expr,
                        cons_type=ConstraintTypeEnum.LTE,
                        is_linear=problem.is_linear,
                        is_convex=problem.is_convex,
                        is_twice_differentiable=problem.is_twice_differentiable,
                    )
                )
            case ("0", _):
                # not relevant for this scalarization
                pass
            case (c, _):
                msg = (
                    f"Warning! The classification {c} was supplied, but it is not supported."
                    "Must be one of ['<', '<=', '0', '=', '>=']"
                )

    max_expr = f"Max({','.join(max_args)})"

    # define the objective function of the scalarization
    aug_expr = " + ".join(
        [
            f"{obj.symbol}_min / ({nadir_point[obj.symbol]} - {ideal_point[obj.symbol] - delta})"
            for obj in problem.objectives
        ]
    )

    target_expr = f"{max_expr} + {rho}*({aug_expr})"
    scalarization = ScalarizationFunction(
        name="NIMBUS scalarization objective function",
        symbol=symbol,
        func=target_expr,
        is_linear=False,
        is_convex=False,
        is_twice_differentiable=False,
    )

    _problem = problem.add_scalarization(scalarization)
    return _problem.add_constraints(constraints), symbol

add_objective_as_scalarization

add_objective_as_scalarization(
    problem: Problem, symbol: str, objective_symbol: str
) -> tuple[Problem, str]

Creates a scalarization where one of the problem's objective functions is optimized.

The scalarization is defined as follows:

\[\begin{equation} \operatorname{min}_{\mathbf{x} \in S} f_t(\mathbf{x}), \end{equation}\]

where \(f_t(\mathbf{x})\) is the objective function to be minimized.

Parameters:

Name Type Description Default
problem Problem

the problem to which the scalarization should be added.

required
symbol str

the symbol to reference the added scalarization function.

required
objective_symbol str

the symbol of the objective function to be optimized.

required

Raises:

Type Description
ScalarizationError

the given objective_symbol does not exist in the problem.

Returns:

Type Description
tuple[Problem, str]

tuple[Problem, str]: A tuple containing a copy of the problem with the scalarization function added, and the symbol of the added scalarization function.

Source code in desdeo/tools/scalarization.py
def add_objective_as_scalarization(problem: Problem, symbol: str, objective_symbol: str) -> tuple[Problem, str]:
    r"""Creates a scalarization where one of the problem's objective functions is optimized.

    The scalarization is defined as follows:

    \begin{equation}
        \operatorname{min}_{\mathbf{x} \in S} f_t(\mathbf{x}),
    \end{equation}

    where $f_t(\mathbf{x})$ is the objective function to be minimized.

    Args:
        problem (Problem): the problem to which the scalarization should be added.
        symbol (str): the symbol to reference the added scalarization function.
        objective_symbol (str): the symbol of the objective function to be optimized.

    Raises:
        ScalarizationError: the given objective_symbol does not exist in the problem.

    Returns:
        tuple[Problem, str]: A tuple containing a copy of the problem with the scalarization function added,
            and the symbol of the added scalarization function.
    """
    # check that symbol exists
    if problem.get_objective(objective_symbol, copy=False) is None:
        msg = f"The given objective symbol {objective_symbol} is not defined in the problem.."
        raise ScalarizationError(msg)

    sf = ["Multiply", 1, f"{objective_symbol}_min"]

    original_objective = problem.get_objective(objective_symbol, copy=False)

    # Add the function to the problem
    scalarization_function = ScalarizationFunction(
        name=f"Objective {objective_symbol}",
        symbol=symbol,
        func=sf,
        is_linear=original_objective.is_linear,
        is_convex=original_objective.is_convex,
        is_twice_differentiable=original_objective.is_twice_differentiable,
    )
    return problem.add_scalarization(scalarization_function), symbol

add_stom_sf_diff

add_stom_sf_diff(
    problem: Problem,
    symbol: str,
    reference_point: dict[str, float],
    ideal: dict[str, float] | None = None,
    rho: float = 1e-06,
    delta: float = 1e-06,
) -> tuple[Problem, str]

Adds the differentiable variant of the STOM scalarizing function.

\[\begin{align*} \min \quad & \alpha + \rho \sum_{i=1}^k \frac{f_i(\mathbf{x})}{\bar{z}_i - z_i^{\star\star}} \\ \text{s.t.} \quad & \frac{f_i(\mathbf{x}) - z_i^{\star\star}}{\bar{z}_i - z_i^{\star\star}} - \alpha \leq 0 \quad & \forall i = 1,\dots,k\\ & \mathbf{x} \in S, \end{align*}\]

where \(f_i\) are objective functions, \(z_i^{\star\star} = z_i^\star - \delta\) is a component of the utopian point, \(\bar{z}_i\) is a component of the reference point, \(\rho\) and \(\delta\) are small scalar values, \(S\) is the feasible solution space of the original problem, and \(\alpha\) is an auxiliary variable.

References

H. Nakayama, Y. Sawaragi, Satisficing trade-off method for multiobjective programming, in: M. Grauer, A.P. Wierzbicki (Eds.), Interactive Decision Analysis, Springer Verlag, Berlin, 1984, pp. 113-122.

Parameters:

Name Type Description Default
problem Problem

the problem the scalarization is added to.

required
symbol str

the symbol given to the added scalarization.

required
reference_point dict[str, float]

a dict with keys corresponding to objective function symbols and values to reference point components, i.e., aspiration levels.

required
ideal dict[str, float]

ideal point values. If not given, attempt will be made to calculate ideal point from problem.

None
rho float

a small scalar value to scale the sum in the objective function of the scalarization. Defaults to 1e-6.

1e-06
delta float

a small scalar value to define the utopian point. Defaults to 1e-6.

1e-06

Returns:

Type Description
tuple[Problem, str]

tuple[Problem, str]: a tuple with the copy of the problem with the added scalarization and the symbol of the added scalarization.

Source code in desdeo/tools/scalarization.py
def add_stom_sf_diff(
    problem: Problem,
    symbol: str,
    reference_point: dict[str, float],
    ideal: dict[str, float] | None = None,
    rho: float = 1e-6,
    delta: float = 1e-6,
) -> tuple[Problem, str]:
    r"""Adds the differentiable variant of the STOM scalarizing function.

    \begin{align*}
        \min \quad & \alpha + \rho \sum_{i=1}^k \frac{f_i(\mathbf{x})}{\bar{z}_i - z_i^{\star\star}} \\
        \text{s.t.} \quad & \frac{f_i(\mathbf{x}) - z_i^{\star\star}}{\bar{z}_i
        - z_i^{\star\star}} - \alpha \leq 0 \quad & \forall i = 1,\dots,k\\
        & \mathbf{x} \in S,
    \end{align*}

    where $f_i$ are objective functions, $z_i^{\star\star} = z_i^\star - \delta$ is
    a component of the utopian point, $\bar{z}_i$ is a component of the reference point,
    $\rho$ and $\delta$ are small scalar values, $S$ is the feasible solution
    space of the original problem,  and $\alpha$ is an auxiliary variable.

    References:
        H. Nakayama, Y. Sawaragi, Satisficing trade-off method for
            multiobjective programming, in: M. Grauer, A.P. Wierzbicki (Eds.),
            Interactive Decision Analysis, Springer Verlag, Berlin, 1984, pp.
            113-122.

    Args:
        problem (Problem): the problem the scalarization is added to.
        symbol (str): the symbol given to the added scalarization.
        reference_point (dict[str, float]): a dict with keys corresponding to objective
            function symbols and values to reference point components, i.e.,
            aspiration levels.
        ideal (dict[str, float], optional): ideal point values. If not given, attempt will be made
            to calculate ideal point from problem.
        rho (float, optional): a small scalar value to scale the sum in the objective
            function of the scalarization. Defaults to 1e-6.
        delta (float, optional): a small scalar value to define the utopian point. Defaults to 1e-6.

    Returns:
        tuple[Problem, str]: a tuple with the copy of the problem with the added
            scalarization and the symbol of the added scalarization.
    """
    # check reference point
    if not objective_dict_has_all_symbols(problem, reference_point):
        msg = f"The give reference point {reference_point} is missing value for one or more objectives."
        raise ScalarizationError(msg)

    # check if ideal point is specified
    # if not specified, try to calculate corrected ideal point
    if ideal is not None:
        ideal_point = ideal
    elif problem.get_ideal_point() is not None:
        ideal_point = get_corrected_ideal(problem)
    else:
        msg = "Ideal point not defined!"
        raise ScalarizationError(msg)

    corrected_rp = flip_maximized_objective_values(problem, reference_point)

    # define the auxiliary variable
    alpha = Variable(
        name="alpha",
        symbol="_alpha",
        variable_type=VariableTypeEnum.real,
        lowerbound=-float("Inf"),
        upperbound=float("Inf"),
        initial_value=1.0,
    )

    # define the objective function of the scalarization
    aug_expr = " + ".join(
        [
            f"{obj.symbol}_min / ({(reference_point[obj.symbol] - ideal_point[obj.symbol]) + delta})"
            for obj in problem.objectives
        ]
    )

    target_expr = f"_alpha + {rho}*" + f"({aug_expr})"
    scalarization = ScalarizationFunction(
        name="STOM scalarization objective function",
        symbol=symbol,
        func=target_expr,
        is_twice_differentiable=problem.is_twice_differentiable,
        is_linear=problem.is_linear,
        is_convex=problem.is_convex,
    )

    constraints = []

    for obj in problem.objectives:
        expr = (
            f"({obj.symbol}_min - {ideal_point[obj.symbol] - delta}) / "
            f"({(corrected_rp[obj.symbol] - ideal_point[obj.symbol]) + delta}) - _alpha"
        )
        constraints.append(
            Constraint(
                name=f"Max constraint for {obj.symbol}",
                symbol=f"{obj.symbol}_maxcon",
                func=expr,
                cons_type=ConstraintTypeEnum.LTE,
                is_twice_differentiable=obj.is_twice_differentiable,
                is_linear=obj.is_linear,
                is_convex=obj.is_convex,
            )
        )

    _problem = problem.add_variables([alpha])
    _problem = _problem.add_scalarization(scalarization)
    return _problem.add_constraints(constraints), symbol

add_stom_sf_nondiff

add_stom_sf_nondiff(
    problem: Problem,
    symbol: str,
    reference_point: dict[str, float],
    ideal: dict[str, float] | None = None,
    rho: float = 1e-06,
    delta: float = 1e-06,
) -> tuple[Problem, str]

Adds the non-differentiable variant of the STOM scalarizing function.

\[\begin{align*} \underset{\mathbf{x}}{\min} \quad & \underset{i=1,\dots,k}{\max}\left[ \frac{f_i(\mathbf{x}) - z_i^{\star\star}}{\bar{z}_i - z_i^{\star\star}} \right] + \rho \sum_{i=1}^k \frac{f_i(\mathbf{x})}{\bar{z}_i - z_i^{\star\star}} \\ \text{s.t.}\quad & \mathbf{x} \in S, \end{align*}\]

where \(f_i\) are objective functions, \(z_i^{\star\star} = z_i^\star - \delta\) is a component of the utopian point, \(\bar{z}_i\) is a component of the reference point, \(\rho\) and \(\delta\) are small scalar values, and \(S\) is the feasible solution space of the original problem.

References

H. Nakayama, Y. Sawaragi, Satisficing trade-off method for multiobjective programming, in: M. Grauer, A.P. Wierzbicki (Eds.), Interactive Decision Analysis, Springer Verlag, Berlin, 1984, pp. 113-122.

Parameters:

Name Type Description Default
problem Problem

the problem the scalarization is added to.

required
symbol str

the symbol given to the added scalarization.

required
reference_point dict[str, float]

a dict with keys corresponding to objective function symbols and values to reference point components, i.e., aspiration levels.

required
ideal dict[str, float]

ideal point values. If not given, attempt will be made to calculate ideal point from problem.

None
rho float

a small scalar value to scale the sum in the objective function of the scalarization. Defaults to 1e-6.

1e-06
delta float

a small scalar value to define the utopian point. Defaults to 1e-6.

1e-06

Returns:

Type Description
tuple[Problem, str]

tuple[Problem, str]: a tuple with the copy of the problem with the added scalarization and the symbol of the added scalarization.

Source code in desdeo/tools/scalarization.py
def add_stom_sf_nondiff(
    problem: Problem,
    symbol: str,
    reference_point: dict[str, float],
    ideal: dict[str, float] | None = None,
    rho: float = 1e-6,
    delta: float = 1e-6,
) -> tuple[Problem, str]:
    r"""Adds the non-differentiable variant of the STOM scalarizing function.

    \begin{align*}
        \underset{\mathbf{x}}{\min} \quad & \underset{i=1,\dots,k}{\max}\left[
            \frac{f_i(\mathbf{x}) - z_i^{\star\star}}{\bar{z}_i - z_i^{\star\star}}
            \right]
            + \rho \sum_{i=1}^k \frac{f_i(\mathbf{x})}{\bar{z}_i - z_i^{\star\star}} \\
        \text{s.t.}\quad & \mathbf{x} \in S,
    \end{align*}

    where $f_i$ are objective functions, $z_i^{\star\star} = z_i^\star - \delta$ is
    a component of the utopian point, $\bar{z}_i$ is a component of the reference point,
    $\rho$ and $\delta$ are small scalar values, and $S$ is the feasible solution
    space of the original problem.

    References:
        H. Nakayama, Y. Sawaragi, Satisficing trade-off method for
            multiobjective programming, in: M. Grauer, A.P. Wierzbicki (Eds.),
            Interactive Decision Analysis, Springer Verlag, Berlin, 1984, pp.
            113-122.

    Args:
        problem (Problem): the problem the scalarization is added to.
        symbol (str): the symbol given to the added scalarization.
        reference_point (dict[str, float]): a dict with keys corresponding to objective
            function symbols and values to reference point components, i.e.,
            aspiration levels.
        ideal (dict[str, float], optional): ideal point values. If not given, attempt will be made
            to calculate ideal point from problem.
        rho (float, optional): a small scalar value to scale the sum in the objective
            function of the scalarization. Defaults to 1e-6.
        delta (float, optional): a small scalar value to define the utopian point. Defaults to 1e-6.

    Returns:
        tuple[Problem, str]: a tuple with the copy of the problem with the added
            scalarization and the symbol of the added scalarization.
    """
    # check reference point
    if not objective_dict_has_all_symbols(problem, reference_point):
        msg = f"The give reference point {reference_point} is missing value for one or more objectives."
        raise ScalarizationError(msg)

    # check if ideal point is specified
    # if not specified, try to calculate corrected ideal point
    if ideal is not None:
        ideal_point = ideal
    elif problem.get_ideal_point() is not None:
        ideal_point = get_corrected_ideal(problem)
    else:
        msg = "Ideal point not defined!"
        raise ScalarizationError(msg)

    corrected_rp = flip_maximized_objective_values(problem, reference_point)

    # define the objective function of the scalarization
    max_expr = ", ".join(
        [
            (
                f"({obj.symbol}_min - {ideal_point[obj.symbol] - delta}) / "
                f"({(corrected_rp[obj.symbol] - ideal_point[obj.symbol]) + delta})"
            )
            for obj in problem.objectives
        ]
    )
    aug_expr = " + ".join(
        [
            f"{obj.symbol}_min / ({(reference_point[obj.symbol] - ideal_point[obj.symbol]) + delta})"
            for obj in problem.objectives
        ]
    )

    target_expr = f"{Op.MAX}({max_expr}) + {rho}*" + f"({aug_expr})"
    scalarization = ScalarizationFunction(
        name="STOM scalarization objective function",
        symbol=symbol,
        func=target_expr,
        is_linear=False,
        is_convex=False,
        is_twice_differentiable=False,
    )

    return problem.add_scalarization(scalarization), symbol

add_weighted_sums

add_weighted_sums(
    problem: Problem, symbol: str, weights: dict[str, float]
) -> tuple[Problem, str]

Add the weighted sums scalarization to a problem with the given weights.

It is assumed that the weights add to 1.

The scalarization is defined as follows:

\[\begin{equation} \begin{aligned} & \mathcal{S}_\text{WS}(F(\mathbf{x});\mathbf{w}) = \sum_{i=1}^{k} w_i f_i(\mathbf{x}) \\ & \text{s.t.} \sum_{i=1}^{k} w_i = 1, \end{aligned} \end{equation}\]

where \(\mathbf{w} = [w_1,\dots,w_k]\) are the weights and \(k\) is the number of objective functions.

Warning

The weighted sums scalarization is often not capable of finding most Pareto optimal solutions when optimized. It is advised to utilize some better scalarization functions.

Parameters:

Name Type Description Default
problem Problem

the problem to which the scalarization should be added.

required
symbol str

the symbol to reference the added scalarization function.

required
weights dict[str, float]

the weights. For the method to work, the weights should sum to 1. However, this is not a condition that is checked.

required

Raises:

Type Description
ScalarizationError

if the weights are missing any of the objective components.

Returns:

Type Description
tuple[Problem, str]

tuple[Problem, str]: A tuple containing a copy of the problem with the scalarization function added, and the symbol of the added scalarization function.

Source code in desdeo/tools/scalarization.py
def add_weighted_sums(problem: Problem, symbol: str, weights: dict[str, float]) -> tuple[Problem, str]:
    r"""Add the weighted sums scalarization to a problem with the given weights.

    It is assumed that the weights add to 1.

    The scalarization is defined as follows:

    \begin{equation}
        \begin{aligned}
        & \mathcal{S}_\text{WS}(F(\mathbf{x});\mathbf{w}) = \sum_{i=1}^{k} w_i f_i(\mathbf{x}) \\
        & \text{s.t.} \sum_{i=1}^{k} w_i = 1,
        \end{aligned}
    \end{equation}

    where $\mathbf{w} = [w_1,\dots,w_k]$ are the weights and $k$ is the number of
    objective functions.

    Warning:
        The weighted sums scalarization is often not capable of finding most Pareto optimal
            solutions when optimized. It is advised to utilize some better scalarization
            functions.

    Args:
        problem (Problem): the problem to which the scalarization should be added.
        symbol (str): the symbol to reference the added scalarization function.
        weights (dict[str, float]): the weights. For the method to work, the weights
            should sum to 1. However, this is not a condition that is checked.

    Raises:
        ScalarizationError: if the weights are missing any of the objective components.

    Returns:
        tuple[Problem, str]: A tuple containing a copy of the problem with the scalarization function added,
            and the symbol of the added scalarization function.
    """
    # check that the weights have all the objective components
    if not all(obj.symbol in weights for obj in problem.objectives):
        msg = f"The given weight vector {weights} does not have a component defined for all the objectives."
        raise ScalarizationError(msg)

    # Build the sum
    sum_terms = [f"({weights[obj.symbol]} * {obj.symbol}_min)" for obj in problem.objectives]

    # aggregate the terms
    sf = " + ".join(sum_terms)

    # Add the function to the problem
    scalarization_function = ScalarizationFunction(
        name="Weighted sums scalarization function",
        symbol=symbol,
        func=sf,
        is_linear=problem.is_linear,
        is_convex=problem.is_convex,
        is_twice_differentiable=problem.is_twice_differentiable,
    )
    return problem.add_scalarization(scalarization_function), symbol

create_epsilon_constraints_json

create_epsilon_constraints_json(
    problem: Problem,
    objective_symbol: str,
    epsilons: dict[str, float],
) -> tuple[list[str | int | float], list[str]]

Creates JSON expressions for an epsilon constraints scalarization and constraints.

It is assumed that epsilon have been given in a format where each objective is to be minimized.

Warning

To be deprecated.

Parameters:

Name Type Description Default
problem Problem

the problem to scalarize.

required
objective_symbol str

the objective used as the objective in the epsilon constraint scalarization.

required
epsilons dict[str, float]

the epsilon constraint values in a dict with each key being an objective's symbol.

required

Raises:

Type Description
ScalarizationError

objective_symbol not found in problem definition.

Returns:

Type Description
tuple[list[str | int | float], list[str]]

tuple[list, list]: the first element is the expression of the scalarized objective expressed in MathJSON format. The second element is a list of expressions of the constraints expressed in MathJSON format. The constraints are in less than or equal format.

Source code in desdeo/tools/scalarization.py
def create_epsilon_constraints_json(
    problem: Problem, objective_symbol: str, epsilons: dict[str, float]
) -> tuple[list[str | int | float], list[str]]:
    """Creates JSON expressions for an epsilon constraints scalarization and constraints.

    It is assumed that epsilon have been given in a format where each objective is to be minimized.

    Warning:
        To be deprecated.

    Args:
        problem (Problem): the problem to scalarize.
        objective_symbol (str): the objective used as the objective in the epsilon constraint scalarization.
        epsilons (dict[str, float]): the epsilon constraint values in a dict
            with each key being an objective's symbol.

    Raises:
        ScalarizationError: `objective_symbol` not found in problem definition.

    Returns:
        tuple[list, list]: the first element is the expression of the scalarized objective expressed in MathJSON format.
            The second element is a list of expressions of the constraints expressed in MathJSON format.
            The constraints are in less than or equal format.
    """
    correct_symbols = [objective.symbol for objective in problem.objectives]
    if objective_symbol not in correct_symbols:
        msg = f"The given objective symbol {objective_symbol} should be one of {correct_symbols}."
        raise ScalarizationError(msg)
    correct_symbols.remove(objective_symbol)

    scalarization_expr = ["Multiply", 1, f"{objective_symbol}_min"]

    # the epsilons must be given such that each objective function is to be minimized
    constraint_exprs = [["Add", f"{obj}_min", ["Negate", epsilons[obj]]] for obj in correct_symbols]

    return scalarization_expr, constraint_exprs

objective_dict_has_all_symbols

objective_dict_has_all_symbols(
    problem: Problem, obj_dict: dict[str, float]
) -> bool

Check that a dict has all the objective function symbols of a problem as its keys.

Parameters:

Name Type Description Default
problem Problem

the problem with the objective symbols.

required
obj_dict dict[str, float]

a dict that should have a key for each objective symbol.

required

Returns:

Name Type Description
bool bool

whether all the symbols are present or not.

Source code in desdeo/tools/scalarization.py
def objective_dict_has_all_symbols(problem: Problem, obj_dict: dict[str, float]) -> bool:
    """Check that a dict has all the objective function symbols of a problem as its keys.

    Args:
        problem (Problem): the problem with the objective symbols.
        obj_dict (dict[str, float]): a dict that should have a key for each objective symbol.

    Returns:
        bool: whether all the symbols are present or not.
    """
    return all(obj.symbol in obj_dict for obj in problem.objectives)

Gurobipy solver interfaces

Defines solver interfaces for gurobipy.

GurobipySolver

Bases: BaseSolver

Creates a gurobipy solver that utilizes gurobi's own Python implementation.

Source code in desdeo/tools/gurobipy_solver_interfaces.py
class GurobipySolver(BaseSolver):
    """Creates a gurobipy solver that utilizes gurobi's own Python implementation."""

    def __init__(self, problem: Problem, options: dict[str, any] | None = None):
        """The solver is initialized by supplying a problem and options.

        Unlike with Pyomo you do not need to have gurobi installed on your system
        for this to work. Suitable for solving mixed-integer linear and quadratic optimization
        problems.

        Args:
            problem (Problem): the problem to be solved.
            options (dict[str,any]): Dictionary of Gurobi parameters to set.
                You probably don't need to set any of these and can just use the defaults.
                For available parameters see https://www.gurobi.com/documentation/current/refman/parameters.html
        """
        self.evaluator = GurobipyEvaluator(problem)
        self.problem = problem

        if options is not None:
            for key, value in options.items():
                self.evaluator.model.setParam(key, value)
        else:
            # Set some default parameters that are good for most problems.
            self.evaluator.model.setParam("OutputFlag", 0)  # Suppress Gurobi output
            self.evaluator.model.setParam("LogToConsole", 0)  # Suppress Gurobi logging to console

    def solve(self, target: str) -> SolverResults:
        """Solve the problem for the given target.

        Args:
            target (str): the symbol of the function to be optimized, and which is
                defined in the problem given when initializing the solver.

        Returns:
            SolverResults: the results of the optimization.
        """
        self.evaluator.set_optimization_target(target)
        self.evaluator.model.optimize()
        return parse_gurobipy_optimizer_results(self.problem, self.evaluator)

__init__

__init__(
    problem: Problem, options: dict[str, any] | None = None
)

The solver is initialized by supplying a problem and options.

Unlike with Pyomo you do not need to have gurobi installed on your system for this to work. Suitable for solving mixed-integer linear and quadratic optimization problems.

Parameters:

Name Type Description Default
problem Problem

the problem to be solved.

required
options dict[str, any]

Dictionary of Gurobi parameters to set. You probably don't need to set any of these and can just use the defaults. For available parameters see https://www.gurobi.com/documentation/current/refman/parameters.html

None
Source code in desdeo/tools/gurobipy_solver_interfaces.py
def __init__(self, problem: Problem, options: dict[str, any] | None = None):
    """The solver is initialized by supplying a problem and options.

    Unlike with Pyomo you do not need to have gurobi installed on your system
    for this to work. Suitable for solving mixed-integer linear and quadratic optimization
    problems.

    Args:
        problem (Problem): the problem to be solved.
        options (dict[str,any]): Dictionary of Gurobi parameters to set.
            You probably don't need to set any of these and can just use the defaults.
            For available parameters see https://www.gurobi.com/documentation/current/refman/parameters.html
    """
    self.evaluator = GurobipyEvaluator(problem)
    self.problem = problem

    if options is not None:
        for key, value in options.items():
            self.evaluator.model.setParam(key, value)
    else:
        # Set some default parameters that are good for most problems.
        self.evaluator.model.setParam("OutputFlag", 0)  # Suppress Gurobi output
        self.evaluator.model.setParam("LogToConsole", 0)  # Suppress Gurobi logging to console

solve

solve(target: str) -> SolverResults

Solve the problem for the given target.

Parameters:

Name Type Description Default
target str

the symbol of the function to be optimized, and which is defined in the problem given when initializing the solver.

required

Returns:

Name Type Description
SolverResults SolverResults

the results of the optimization.

Source code in desdeo/tools/gurobipy_solver_interfaces.py
def solve(self, target: str) -> SolverResults:
    """Solve the problem for the given target.

    Args:
        target (str): the symbol of the function to be optimized, and which is
            defined in the problem given when initializing the solver.

    Returns:
        SolverResults: the results of the optimization.
    """
    self.evaluator.set_optimization_target(target)
    self.evaluator.model.optimize()
    return parse_gurobipy_optimizer_results(self.problem, self.evaluator)

PersistentGurobipySolver

Bases: PersistentSolver

A persistent solver class utlizing gurobipy.

Use this instead of create_gurobipy_solver when re-initializing the solver every time the problem is changed is not practical.

Source code in desdeo/tools/gurobipy_solver_interfaces.py
class PersistentGurobipySolver(PersistentSolver):
    """A persistent solver class utlizing gurobipy.

    Use this instead of create_gurobipy_solver when re-initializing the
    solver every time the problem is changed is not practical.
    """

    evaluator: GurobipyEvaluator

    def __init__(self, problem: Problem, options: dict[str, any] | None = None):
        """Initializer for the persistent solver.

        Args:
            problem (Problem): the problem to be transformed in a GurobipyModel.
            options (dict[str,any]): Dictionary of Gurobi parameters to set.
                You probably don't need to set any of these and can just use the defaults.
                For available parameters see https://www.gurobi.com/documentation/current/refman/parameters.html
        """
        self.problem = problem
        self.evaluator = GurobipyEvaluator(problem)
        if options is not None:
            for key, value in options.items():
                self.evaluator.model.setParam(key, value)

    def add_constraint(self, constraint: Constraint | list[Constraint]) -> gp.Constr | list[gp.Constr]:
        """Add one or more constraint expressions to the solver.

        If adding a lot of constraints or dealing with a large model, this function
        may end up being very slow compared to adding the constraints to the model
        stored in the evaluator directly.

        Args:
            constraint (Constraint): the constraint function expression or a list of
                constraint function expressions.

        Raises:
            GurobipyEvaluatorError: when an unsupported constraint type is encountered.

        Returns:
            gurobipy.Constr: The gurobipy constraint that was added or a list of gurobipy
                constraints if the constraint argument was a list.
        """
        if isinstance(constraint, list):
            cons_list = list[gp.Constr]
            for cons in constraint:
                cons_list.append(self.evaluator.add_constraint(cons))
            return cons_list

        return self.evaluator.add_constraint(constraint)

    def add_objective(self, objective: Objective | list[Objective]):
        """Adds an objective function expression to the solver.

        Does not yet add any actual gurobipy optimization objectives, only adds them to the dict
        containing the expressions of the objectives. The objective expressions are stored in the
        evaluator and the evaluator must add the appropiate gurobipy objective before solving.

        Args:
            objective (Objective): an objective function expression or a list of objective function
                expressions to be added.
        """
        if not isinstance(objective, list):
            objective = [objective]

        for obj in objective:
            self.evaluator.add_objective(obj)

    def add_scalarization_function(self, scalarization: ScalarizationFunction | list[ScalarizationFunction]):
        """Adds a scalrization expression to the solver.

        Scalarizations work identically to objectives, except they are stored in a different
        dict in the evaluator. If you want to solve the problem using a scalarization, the
        evaluator needs to set it as an optimization target first.

        Args:
            scalarization (ScalarizationFunction): A scalarization function or a list of
                scalarization functions to be added.
        """
        if not isinstance(scalarization, list):
            scalarization = [scalarization]

        for scal in scalarization:
            self.evaluator.add_scalarization_function(scal)

    def add_variable(
        self, variable: Variable | TensorVariable | list[Variable] | list[TensorVariable]
    ) -> gp.Var | gp.MVar | list[gp.Var] | list[gp.MVar]:
        """Add one or more variables to the solver.

        If adding a lot of variables or dealing with a large model, this function
        may end up being very slow compared to adding the variables to the model
        stored in the evaluator directly.

        Args:
            variable (Variable): The definition of the variable or a list of variables to be added.

        Raises:
            GurobipyEvaluatorError: when a problem in extracting the variables is encountered.
                I.e., the variables are of a non supported type.

        Returns:
            gp.Var: the variable that was added to the model or a list of variables if
                variable argument was a list.
        """
        if isinstance(variable, list):
            var_list = list[gp.Var | gp.MVar]
            for var in variable:
                var_list.append(self.evaluator.add_variable(var))
            return var_list

        return self.evaluator.add_variable(variable)

    def remove_constraint(self, symbol: str | list[str]):
        """Removes a constraint from the solver.

        If removing a lot of constraints or dealing with a very large model this function
        may be slow because of the model.update() calls. Accessing the model stored in the
        evaluator directly may be faster.

        Args:
            symbol (str): a str representing the symbol of the constraint to be removed.
                Can also be a list of multiple symbols.
        """
        if not isinstance(symbol, list):
            symbol = [symbol]
        for s in symbol:
            self.evaluator.remove_constraint(s)

    def remove_variable(self, symbol: str | list[str]):
        """Removes a variable from the model.

        If removing a lot of variables or dealing with a very large model this function
        may be slow because of the model.update() calls. Accessing the model stored in
        the evaluator directly may be faster.

        Args:
            symbol (str): a str representing the symbol of the variable to be removed.
                Can also be a list of multiple symbols.
        """
        self.evaluator.remove_variable(symbol)

    def solve(self, target: str) -> SolverResults:
        """Solves the current problem with the specified target.

        Args:
            target (str): a str representing the symbol of the target function.

        Returns:
            SolverResults: The results of the solver
        """
        self.evaluator.set_optimization_target(target)
        self.evaluator.model.optimize()
        return parse_gurobipy_optimizer_results(self.problem, self.evaluator)

__init__

__init__(
    problem: Problem, options: dict[str, any] | None = None
)

Initializer for the persistent solver.

Parameters:

Name Type Description Default
problem Problem

the problem to be transformed in a GurobipyModel.

required
options dict[str, any]

Dictionary of Gurobi parameters to set. You probably don't need to set any of these and can just use the defaults. For available parameters see https://www.gurobi.com/documentation/current/refman/parameters.html

None
Source code in desdeo/tools/gurobipy_solver_interfaces.py
def __init__(self, problem: Problem, options: dict[str, any] | None = None):
    """Initializer for the persistent solver.

    Args:
        problem (Problem): the problem to be transformed in a GurobipyModel.
        options (dict[str,any]): Dictionary of Gurobi parameters to set.
            You probably don't need to set any of these and can just use the defaults.
            For available parameters see https://www.gurobi.com/documentation/current/refman/parameters.html
    """
    self.problem = problem
    self.evaluator = GurobipyEvaluator(problem)
    if options is not None:
        for key, value in options.items():
            self.evaluator.model.setParam(key, value)

add_constraint

add_constraint(
    constraint: Constraint | list[Constraint],
) -> gp.Constr | list[gp.Constr]

Add one or more constraint expressions to the solver.

If adding a lot of constraints or dealing with a large model, this function may end up being very slow compared to adding the constraints to the model stored in the evaluator directly.

Parameters:

Name Type Description Default
constraint Constraint

the constraint function expression or a list of constraint function expressions.

required

Raises:

Type Description
GurobipyEvaluatorError

when an unsupported constraint type is encountered.

Returns:

Type Description
Constr | list[Constr]

gurobipy.Constr: The gurobipy constraint that was added or a list of gurobipy constraints if the constraint argument was a list.

Source code in desdeo/tools/gurobipy_solver_interfaces.py
def add_constraint(self, constraint: Constraint | list[Constraint]) -> gp.Constr | list[gp.Constr]:
    """Add one or more constraint expressions to the solver.

    If adding a lot of constraints or dealing with a large model, this function
    may end up being very slow compared to adding the constraints to the model
    stored in the evaluator directly.

    Args:
        constraint (Constraint): the constraint function expression or a list of
            constraint function expressions.

    Raises:
        GurobipyEvaluatorError: when an unsupported constraint type is encountered.

    Returns:
        gurobipy.Constr: The gurobipy constraint that was added or a list of gurobipy
            constraints if the constraint argument was a list.
    """
    if isinstance(constraint, list):
        cons_list = list[gp.Constr]
        for cons in constraint:
            cons_list.append(self.evaluator.add_constraint(cons))
        return cons_list

    return self.evaluator.add_constraint(constraint)

add_objective

add_objective(objective: Objective | list[Objective])

Adds an objective function expression to the solver.

Does not yet add any actual gurobipy optimization objectives, only adds them to the dict containing the expressions of the objectives. The objective expressions are stored in the evaluator and the evaluator must add the appropiate gurobipy objective before solving.

Parameters:

Name Type Description Default
objective Objective

an objective function expression or a list of objective function expressions to be added.

required
Source code in desdeo/tools/gurobipy_solver_interfaces.py
def add_objective(self, objective: Objective | list[Objective]):
    """Adds an objective function expression to the solver.

    Does not yet add any actual gurobipy optimization objectives, only adds them to the dict
    containing the expressions of the objectives. The objective expressions are stored in the
    evaluator and the evaluator must add the appropiate gurobipy objective before solving.

    Args:
        objective (Objective): an objective function expression or a list of objective function
            expressions to be added.
    """
    if not isinstance(objective, list):
        objective = [objective]

    for obj in objective:
        self.evaluator.add_objective(obj)

add_scalarization_function

add_scalarization_function(
    scalarization: ScalarizationFunction
    | list[ScalarizationFunction],
)

Adds a scalrization expression to the solver.

Scalarizations work identically to objectives, except they are stored in a different dict in the evaluator. If you want to solve the problem using a scalarization, the evaluator needs to set it as an optimization target first.

Parameters:

Name Type Description Default
scalarization ScalarizationFunction

A scalarization function or a list of scalarization functions to be added.

required
Source code in desdeo/tools/gurobipy_solver_interfaces.py
def add_scalarization_function(self, scalarization: ScalarizationFunction | list[ScalarizationFunction]):
    """Adds a scalrization expression to the solver.

    Scalarizations work identically to objectives, except they are stored in a different
    dict in the evaluator. If you want to solve the problem using a scalarization, the
    evaluator needs to set it as an optimization target first.

    Args:
        scalarization (ScalarizationFunction): A scalarization function or a list of
            scalarization functions to be added.
    """
    if not isinstance(scalarization, list):
        scalarization = [scalarization]

    for scal in scalarization:
        self.evaluator.add_scalarization_function(scal)

add_variable

add_variable(
    variable: Variable
    | TensorVariable
    | list[Variable]
    | list[TensorVariable],
) -> gp.Var | gp.MVar | list[gp.Var] | list[gp.MVar]

Add one or more variables to the solver.

If adding a lot of variables or dealing with a large model, this function may end up being very slow compared to adding the variables to the model stored in the evaluator directly.

Parameters:

Name Type Description Default
variable Variable

The definition of the variable or a list of variables to be added.

required

Raises:

Type Description
GurobipyEvaluatorError

when a problem in extracting the variables is encountered. I.e., the variables are of a non supported type.

Returns:

Type Description
Var | MVar | list[Var] | list[MVar]

gp.Var: the variable that was added to the model or a list of variables if variable argument was a list.

Source code in desdeo/tools/gurobipy_solver_interfaces.py
def add_variable(
    self, variable: Variable | TensorVariable | list[Variable] | list[TensorVariable]
) -> gp.Var | gp.MVar | list[gp.Var] | list[gp.MVar]:
    """Add one or more variables to the solver.

    If adding a lot of variables or dealing with a large model, this function
    may end up being very slow compared to adding the variables to the model
    stored in the evaluator directly.

    Args:
        variable (Variable): The definition of the variable or a list of variables to be added.

    Raises:
        GurobipyEvaluatorError: when a problem in extracting the variables is encountered.
            I.e., the variables are of a non supported type.

    Returns:
        gp.Var: the variable that was added to the model or a list of variables if
            variable argument was a list.
    """
    if isinstance(variable, list):
        var_list = list[gp.Var | gp.MVar]
        for var in variable:
            var_list.append(self.evaluator.add_variable(var))
        return var_list

    return self.evaluator.add_variable(variable)

remove_constraint

remove_constraint(symbol: str | list[str])

Removes a constraint from the solver.

If removing a lot of constraints or dealing with a very large model this function may be slow because of the model.update() calls. Accessing the model stored in the evaluator directly may be faster.

Parameters:

Name Type Description Default
symbol str

a str representing the symbol of the constraint to be removed. Can also be a list of multiple symbols.

required
Source code in desdeo/tools/gurobipy_solver_interfaces.py
def remove_constraint(self, symbol: str | list[str]):
    """Removes a constraint from the solver.

    If removing a lot of constraints or dealing with a very large model this function
    may be slow because of the model.update() calls. Accessing the model stored in the
    evaluator directly may be faster.

    Args:
        symbol (str): a str representing the symbol of the constraint to be removed.
            Can also be a list of multiple symbols.
    """
    if not isinstance(symbol, list):
        symbol = [symbol]
    for s in symbol:
        self.evaluator.remove_constraint(s)

remove_variable

remove_variable(symbol: str | list[str])

Removes a variable from the model.

If removing a lot of variables or dealing with a very large model this function may be slow because of the model.update() calls. Accessing the model stored in the evaluator directly may be faster.

Parameters:

Name Type Description Default
symbol str

a str representing the symbol of the variable to be removed. Can also be a list of multiple symbols.

required
Source code in desdeo/tools/gurobipy_solver_interfaces.py
def remove_variable(self, symbol: str | list[str]):
    """Removes a variable from the model.

    If removing a lot of variables or dealing with a very large model this function
    may be slow because of the model.update() calls. Accessing the model stored in
    the evaluator directly may be faster.

    Args:
        symbol (str): a str representing the symbol of the variable to be removed.
            Can also be a list of multiple symbols.
    """
    self.evaluator.remove_variable(symbol)

solve

solve(target: str) -> SolverResults

Solves the current problem with the specified target.

Parameters:

Name Type Description Default
target str

a str representing the symbol of the target function.

required

Returns:

Name Type Description
SolverResults SolverResults

The results of the solver

Source code in desdeo/tools/gurobipy_solver_interfaces.py
def solve(self, target: str) -> SolverResults:
    """Solves the current problem with the specified target.

    Args:
        target (str): a str representing the symbol of the target function.

    Returns:
        SolverResults: The results of the solver
    """
    self.evaluator.set_optimization_target(target)
    self.evaluator.model.optimize()
    return parse_gurobipy_optimizer_results(self.problem, self.evaluator)

_constraint_contains_objective

_constraint_contains_objective(
    constraint: Constraint, objective_symbol: str
) -> bool

Check if a constraint's function contains a specific objective symbol.

Source code in desdeo/tools/gurobipy_solver_interfaces.py
def _constraint_contains_objective(constraint: Constraint, objective_symbol: str) -> bool:
    """Check if a constraint's function contains a specific objective symbol."""
    func_str = str(constraint.func)
    return objective_symbol in func_str

check_gurobi_license

check_gurobi_license()

Check if Gurobi is using a full license (not trial).

Returns:

Type Description

True if using full academic/commercial license

False if using trial license or no license found

Source code in desdeo/tools/gurobipy_solver_interfaces.py
def check_gurobi_license():
    """Check if Gurobi is using a full license (not trial).

    Returns:
        True if using full academic/commercial license
        False if using trial license or no license found
    """
    captured_output = io.StringIO()
    original_stdout = sys.stdout

    try:
        sys.stdout = captured_output
        with gp.Env(empty=True) as env:
            env.setParam("OutputFlag", 1)
            env.start()
        sys.stdout = original_stdout

        output = captured_output.getvalue()
        return "Restricted license - for non-production use only" not in output  # noqa: TRY300

    except Exception:
        sys.stdout = original_stdout
        return False

parse_gurobipy_optimizer_results

parse_gurobipy_optimizer_results(
    problem: Problem, evaluator: GurobipyEvaluator
) -> SolverResults

Parses results from GurobipyEvaluator's model into DESDEO SolverResults.

Parameters:

Name Type Description Default
problem Problem

the problem being solved.

required
evaluator GurobipyEvaluator

the evaluator utilized to solve the problem.

required

Returns:

Name Type Description
SolverResults SolverResults

DESDEO solver results.

Source code in desdeo/tools/gurobipy_solver_interfaces.py
def parse_gurobipy_optimizer_results(problem: Problem, evaluator: GurobipyEvaluator) -> SolverResults:
    """Parses results from GurobipyEvaluator's model into DESDEO SolverResults.

    Args:
        problem (Problem): the problem being solved.
        evaluator (GurobipyEvaluator): the evaluator utilized to solve the problem.

    Returns:
        SolverResults: DESDEO solver results.
    """
    results = evaluator.get_values()

    variable_values = {var.symbol: results[var.symbol] for var in problem.variables}
    objective_values = {obj.symbol: results[obj.symbol] for obj in problem.objectives}
    constraint_values = (
        {con.symbol: results[con.symbol] for con in problem.constraints} if problem.constraints is not None else None
    )
    extra_func_values = (
        {extra.symbol: results[extra.symbol] for extra in problem.extra_funcs}
        if problem.extra_funcs is not None
        else None
    )
    scalarization_values = (
        {scal.symbol: results[scal.symbol] for scal in problem.scalarization_funcs}
        if problem.scalarization_funcs is not None
        else None
    )
    lagrange_multipliers = None
    if problem.constraints is not None:
        objective_symbols = set(objective_values.keys())
        lagrange_multipliers = {
            con.symbol: results[con.symbol]
            for con in problem.constraints
            if any(obj_sym in con.symbol for obj_sym in objective_symbols)
        }

    success = evaluator.model.status == gp.GRB.OPTIMAL
    if evaluator.model.status == gp.GRB.OPTIMAL:
        status = "Optimal solution found."
    elif evaluator.model.status == gp.GRB.INFEASIBLE:
        status = "Model is infeasible."
    elif evaluator.model.status == gp.GRB.UNBOUNDED:
        status = "Model is unbounded."
    elif evaluator.model.status == gp.GRB.INF_OR_UNBD:
        status = "Model is either infeasible or unbounded."
    else:
        status = f"Optimization ended with status: {evaluator.model.status}"
    msg = f"Gurobipy solver status is: '{status}'"

    return SolverResults(
        optimal_variables=variable_values,
        optimal_objectives=objective_values,
        constraint_values=constraint_values,
        extra_func_values=extra_func_values,
        scalarization_values=scalarization_values,
        lagrange_multipliers=lagrange_multipliers,
        success=success,
        message=msg,
    )

Nevergrad solver interfaces

Solver interfaces to the optimization routines found in nevergrad.

For more info, see https://facebookresearch.github.io/nevergrad/index.html

_default_nevergrad_generic_options module-attribute

_default_nevergrad_generic_options = (
    NevergradGenericOptions()
)

The set of default options for nevergrad's NgOpt optimizer.

NevergradGenericOptions

Bases: BaseModel

Defines options to be passed to nevergrad's optimization routines.

Source code in desdeo/tools/ng_solver_interfaces.py
class NevergradGenericOptions(BaseModel):
    """Defines options to be passed to nevergrad's optimization routines."""

    budget: int = Field(description="The maximum number of allowed function evaluations.", default=100)
    """The maximum number of allowed function evaluations. Defaults to 100."""

    num_workers: int = Field(description="The maximum number of allowed parallel evaluations.", default=1)
    """The maximum number of allowed parallel evaluations. This is currently
    used to define the batch size when evaluating problems. Defaults to 1."""

    optimizer: Literal[*available_nevergrad_optimizers] = Field(
        description=(
            "The optimizer to be used. Must be one of `NGOpt`, `TwoPointDE`, `PortfolioDiscreteOnePlusOne`, "
            "`OnePlusOne`, `CMA`, `TBPSA`, `PSO`, `ScrHammersleySearchPlusMiddlePoint`, or `RandomSearch`. "
            "Defaults to `NGOpt`."
        ),
        default="NGOpt",
    )
    """The optimizer to be used. Must be one of `NGOpt`, `TwoPointsDE`, `PortfolioDiscreteOnePlusOne`,
    `OnePlusOne`, `CMA`, `TBPSA`, `PSO`, `ScrHammersleySearchPlusMiddlePoint`, or `RandomSearch`.
    Defaults to `NGOpt`."""

budget class-attribute instance-attribute

budget: int = Field(
    description="The maximum number of allowed function evaluations.",
    default=100,
)

The maximum number of allowed function evaluations. Defaults to 100.

num_workers class-attribute instance-attribute

num_workers: int = Field(
    description="The maximum number of allowed parallel evaluations.",
    default=1,
)

The maximum number of allowed parallel evaluations. This is currently used to define the batch size when evaluating problems. Defaults to 1.

optimizer class-attribute instance-attribute

optimizer: Literal[*available_nevergrad_optimizers,] = (
    Field(
        description="The optimizer to be used. Must be one of `NGOpt`, `TwoPointDE`, `PortfolioDiscreteOnePlusOne`, `OnePlusOne`, `CMA`, `TBPSA`, `PSO`, `ScrHammersleySearchPlusMiddlePoint`, or `RandomSearch`. Defaults to `NGOpt`.",
        default="NGOpt",
    )
)

The optimizer to be used. Must be one of NGOpt, TwoPointsDE, PortfolioDiscreteOnePlusOne, OnePlusOne, CMA, TBPSA, PSO, ScrHammersleySearchPlusMiddlePoint, or RandomSearch. Defaults to NGOpt.

NevergradGenericSolver

Bases: BaseSolver

Creates a solver that utilizes optimizations routines found in the nevergrad library.

Source code in desdeo/tools/ng_solver_interfaces.py
class NevergradGenericSolver(BaseSolver):
    """Creates a solver that utilizes optimizations routines found in the nevergrad library."""

    def __init__(self, problem: Problem, options: NevergradGenericOptions | None = _default_nevergrad_generic_options):
        """Creates a solver that utilizes optimizations routines found in the nevergrad library.

        These solvers are best utilized for black-box, gradient free optimization with
        computationally expensive function calls. Utilizing multiple workers is recommended
        (see `NevergradGenericOptions`) when function calls are heavily I/O bound.

        See https://facebookresearch.github.io/nevergrad/getting_started.html for further information
        on nevergrad and its solvers.

        References:
            Rapin, J., & Teytaud, O. (2018). Nevergrad - A gradient-free
                optimization platform. GitHub.
                https://GitHub.com/FacebookResearch/Nevergrad

        Args:
            problem (Problem): the problem to be solved.
            options (NgOptOptions | None): options to be passes to the solver.
                If none, `_default_ng_ngopt_options` are used. Defaults to None.

        """
        self.problem = problem
        self.options = options if options is not None else _default_nevergrad_generic_options
        self.evaluator = SympyEvaluator(problem)

    def solve(self, target: str) -> SolverResults:
        """Solve the problem for the given target.

        Args:
            target (str): the symbol of the objective function to be optimized.

        Returns:
            SolverResults: the results of the optimization.
        """
        parametrization = ng.p.Dict(
            **{
                var.symbol: ng.p.Scalar(
                    # sets the initial value of the variables, if None, then the
                    # mid-point of the lower and upper bounds is chosen as the
                    # initial value.
                    init=var.initial_value if var.initial_value is not None else (var.lowerbound + var.upperbound) / 2
                ).set_bounds(var.lowerbound, var.upperbound)
                for var in self.problem.variables
            }
        )

        optimizer = ng.optimizers.registry[self.options.optimizer](
            parametrization=parametrization, **self.options.model_dump(exclude="optimizer")
        )

        constraint_symbols = (
            None if self.problem.constraints is None else [con.symbol for con in self.problem.constraints]
        )

        try:
            if optimizer.num_workers == 1:
                # single thread
                recommendation = optimizer.minimize(
                    lambda xs, t=target: self.evaluator.evaluate_target(xs, t),
                    constraint_violation=[
                        lambda xs, t=con_t: self.evaluator.evaluate_target(xs, t) for con_t in constraint_symbols
                    ]
                    if constraint_symbols is not None
                    else None,
                )

            elif optimizer.num_workers > 1:
                # multiple processors
                with ThreadPoolExecutor(max_workers=optimizer.num_workers) as executor:
                    recommendation = optimizer.minimize(
                        lambda xs, t=target: self.evaluator.evaluate_target(xs, t),
                        constraint_violation=[
                            lambda xs, t=con_t: self.evaluator.evaluate_target(xs, t) for con_t in constraint_symbols
                        ]
                        if constraint_symbols is not None
                        else None,
                        executor=executor,
                        batch_mode=False,
                    )

            msg = f"Recommendation found by {self.options.optimizer}."
            success = True

        except Exception as e:
            msg = f"{self.options.optimizer} failed. Possible reason: {e}"
            success = False

        result = {"recommendation": recommendation, "message": msg, "success": success}

        return parse_ng_results(result, self.problem, self.evaluator)

__init__

__init__(
    problem: Problem,
    options: NevergradGenericOptions
    | None = _default_nevergrad_generic_options,
)

Creates a solver that utilizes optimizations routines found in the nevergrad library.

These solvers are best utilized for black-box, gradient free optimization with computationally expensive function calls. Utilizing multiple workers is recommended (see NevergradGenericOptions) when function calls are heavily I/O bound.

See https://facebookresearch.github.io/nevergrad/getting_started.html for further information on nevergrad and its solvers.

References

Rapin, J., & Teytaud, O. (2018). Nevergrad - A gradient-free optimization platform. GitHub. https://GitHub.com/FacebookResearch/Nevergrad

Parameters:

Name Type Description Default
problem Problem

the problem to be solved.

required
options NgOptOptions | None

options to be passes to the solver. If none, _default_ng_ngopt_options are used. Defaults to None.

_default_nevergrad_generic_options
Source code in desdeo/tools/ng_solver_interfaces.py
def __init__(self, problem: Problem, options: NevergradGenericOptions | None = _default_nevergrad_generic_options):
    """Creates a solver that utilizes optimizations routines found in the nevergrad library.

    These solvers are best utilized for black-box, gradient free optimization with
    computationally expensive function calls. Utilizing multiple workers is recommended
    (see `NevergradGenericOptions`) when function calls are heavily I/O bound.

    See https://facebookresearch.github.io/nevergrad/getting_started.html for further information
    on nevergrad and its solvers.

    References:
        Rapin, J., & Teytaud, O. (2018). Nevergrad - A gradient-free
            optimization platform. GitHub.
            https://GitHub.com/FacebookResearch/Nevergrad

    Args:
        problem (Problem): the problem to be solved.
        options (NgOptOptions | None): options to be passes to the solver.
            If none, `_default_ng_ngopt_options` are used. Defaults to None.

    """
    self.problem = problem
    self.options = options if options is not None else _default_nevergrad_generic_options
    self.evaluator = SympyEvaluator(problem)

solve

solve(target: str) -> SolverResults

Solve the problem for the given target.

Parameters:

Name Type Description Default
target str

the symbol of the objective function to be optimized.

required

Returns:

Name Type Description
SolverResults SolverResults

the results of the optimization.

Source code in desdeo/tools/ng_solver_interfaces.py
def solve(self, target: str) -> SolverResults:
    """Solve the problem for the given target.

    Args:
        target (str): the symbol of the objective function to be optimized.

    Returns:
        SolverResults: the results of the optimization.
    """
    parametrization = ng.p.Dict(
        **{
            var.symbol: ng.p.Scalar(
                # sets the initial value of the variables, if None, then the
                # mid-point of the lower and upper bounds is chosen as the
                # initial value.
                init=var.initial_value if var.initial_value is not None else (var.lowerbound + var.upperbound) / 2
            ).set_bounds(var.lowerbound, var.upperbound)
            for var in self.problem.variables
        }
    )

    optimizer = ng.optimizers.registry[self.options.optimizer](
        parametrization=parametrization, **self.options.model_dump(exclude="optimizer")
    )

    constraint_symbols = (
        None if self.problem.constraints is None else [con.symbol for con in self.problem.constraints]
    )

    try:
        if optimizer.num_workers == 1:
            # single thread
            recommendation = optimizer.minimize(
                lambda xs, t=target: self.evaluator.evaluate_target(xs, t),
                constraint_violation=[
                    lambda xs, t=con_t: self.evaluator.evaluate_target(xs, t) for con_t in constraint_symbols
                ]
                if constraint_symbols is not None
                else None,
            )

        elif optimizer.num_workers > 1:
            # multiple processors
            with ThreadPoolExecutor(max_workers=optimizer.num_workers) as executor:
                recommendation = optimizer.minimize(
                    lambda xs, t=target: self.evaluator.evaluate_target(xs, t),
                    constraint_violation=[
                        lambda xs, t=con_t: self.evaluator.evaluate_target(xs, t) for con_t in constraint_symbols
                    ]
                    if constraint_symbols is not None
                    else None,
                    executor=executor,
                    batch_mode=False,
                )

        msg = f"Recommendation found by {self.options.optimizer}."
        success = True

    except Exception as e:
        msg = f"{self.options.optimizer} failed. Possible reason: {e}"
        success = False

    result = {"recommendation": recommendation, "message": msg, "success": success}

    return parse_ng_results(result, self.problem, self.evaluator)

parse_ng_results

parse_ng_results(
    results: dict,
    problem: Problem,
    evaluator: SympyEvaluator,
) -> SolverResults

Parses the optimization results returned by nevergrad solvers.

Parameters:

Name Type Description Default
results dict

the results. A dict with at least the keys recommendation, which points to a parametrization returned by nevergrad solvers, message with information about the optimization, and success indicating whther a recommendation was found successfully or not.

required
problem Problem

the problem the results belong to.

required
evaluator GenericEvaluator

the evaluator used to evaluate the problem.

required

Returns:

Name Type Description
SolverResults SolverResults

a pydantic dataclass withthe relevant optimization results.

Source code in desdeo/tools/ng_solver_interfaces.py
def parse_ng_results(results: dict, problem: Problem, evaluator: SympyEvaluator) -> SolverResults:
    """Parses the optimization results returned by nevergrad solvers.

    Args:
        results (dict): the results. A dict with at least the keys
            `recommendation`, which points to a parametrization returned by
            nevergrad solvers, `message` with information about the optimization,
            and `success` indicating whther a recommendation was found successfully
            or not.
        problem (Problem): the problem the results belong to.
        evaluator (GenericEvaluator): the evaluator used to evaluate the problem.

    Returns:
        SolverResults: a pydantic dataclass withthe relevant optimization results.
    """
    optimal_variables = results["recommendation"].value
    success = results["success"]
    msg = results["message"]

    results = evaluator.evaluate(optimal_variables)

    optimal_objectives = {obj.symbol: results[obj.symbol] for obj in problem.objectives}

    constraint_values = (
        {con.symbol: results[con.symbol] for con in problem.constraints} if problem.constraints is not None else None
    )
    extra_func_values = (
        {extra.symbol: results[extra.symbol] for extra in problem.extra_funcs}
        if problem.extra_funcs is not None
        else None
    )
    scalarization_values = (
        {scal.symbol: results[scal.symbol] for scal in problem.scalarization_funcs}
        if problem.scalarization_funcs is not None
        else None
    )

    return SolverResults(
        optimal_variables=optimal_variables,
        optimal_objectives=optimal_objectives,
        constraint_values=constraint_values,
        extra_func_values=extra_func_values,
        scalarization_values=scalarization_values,
        success=success,
        message=msg,
    )

Scipy solver interfaces

Solver interfaces to the optimization routines found in scipy.

These solvers can solve various scalarized problems of multiobjective optimization problems.

EvalTargetEnum

Bases: str, Enum

An enum that describe whether the evaluator target is an objective or a constraint.

Source code in desdeo/tools/scipy_solver_interfaces.py
class EvalTargetEnum(str, Enum):
    """An enum that describe whether the evaluator target is an objective or a constraint."""

    objective = "objective"
    constraint = "constraint"

ScipyDeOptions

Bases: BaseModel

Defines a pydantic model to store and pass options to the Scipy differential evolution solver.

Source code in desdeo/tools/scipy_solver_interfaces.py
class ScipyDeOptions(BaseModel):
    """Defines a pydantic model to store and pass options to the Scipy differential evolution solver."""

    initial_guess: dict[str, float | None] | None = Field(
        description="The initial guess to be utilized in the solver. For variables with a None as their initial "
        "guess, the mid-point of the variable's lower and upper bound is utilized as the initial"
        "guess. If None, it is assumed that there are no initial guesses for any of the variables.",
        default=None,
    )
    de_kwargs: dict | None = Field(
        description="Custom keyword arguments to be forwarded to `scipy.optimize.differential_evolution`.", default=None
    )

ScipyDeSolver

Bases: BaseSolver

Creates a scipy solver that utilizes differential evolution.

Source code in desdeo/tools/scipy_solver_interfaces.py
class ScipyDeSolver(BaseSolver):
    """Creates a scipy solver that utilizes differential evolution."""

    def __init__(
        self,
        problem: Problem,
        options: ScipyDeOptions = _default_scipy_de_options,
    ):
        """Creates a solver that utilizes the `scipy.optimize.differential_evolution` routine.

        The `scipy.optimize.differential_evolution` routine is fully accessible through this function.
        For additional details and explanation of some of the argumetns, see
        https://docs.scipy.org/doc/scipy/reference/generated/scipy.optimize.differential_evolution.html

        Args:
            problem (Problem): the multiobjective optimization problem to be solved.
            options (ScipyDeOptions): Pydantic model containing arguments used by scipy DE solver.
        """
        initial_guess = options.initial_guess
        de_kwargs = options.de_kwargs

        if variable_dimension_enumerate(problem) not in SUPPORTED_VAR_DIMENSIONS:
            msg = "ScipyDeSolver only supports scalar variables."
            raise SolverError(msg)

        self.problem = problem
        if de_kwargs is None:
            de_kwargs = {
                "strategy": "best1bin",
                "maxiter": 1000,
                "popsize": 15,
                "tol": 0.01,
                "mutation": (0.5, 1),
                "recombination": 0.7,
                "seed": None,
                "callback": None,
                "disp": False,
                "polish": True,
                "init": "latinhypercube",
                "atol": 0,
                "updating": "deferred",
                "workers": 1,
                "integrality": None,
                "vectorized": True,  # the constraints for scipy_de need to be fixed first for this to work
            }
        self.de_kwargs = de_kwargs

        # variable bounds
        self.bounds = get_variable_bounds_pairs(problem)

        # initial guess. If no guess is present for a variable, said variable's mid point of its
        # lower abd upper bound is used instead
        if initial_guess is None:
            self.initial_guess = set_initial_guess(problem)
        else:
            self.initial_guess = initial_guess

        self.evaluator = PolarsEvaluator(problem)
        self.constraints = (
            create_scipy_object_constraints(self.problem, self.evaluator)
            if self.problem.constraints is not None
            else ()
        )

    def solve(self, target: str) -> SolverResults:
        """Solve the problem for a given target.

        Args:
            target (str): the symbol of the objective function to be optimized.

        Returns:
            SolverResults: results of the optimization.
        """
        # If the target is an objective symbol, use its _min form
        objective_symbols = {obj.symbol for obj in self.problem.objectives}
        eval_target = f"{target}_min" if target in objective_symbols else target

        optimization_result: _ScipyOptimizeResult = _scipy_de(
            get_scipy_eval(self.problem, self.evaluator, eval_target, EvalTargetEnum.objective),
            bounds=self.bounds,
            x0=self.initial_guess,
            constraints=self.constraints,
            **self.de_kwargs,
        )

        # parse the results
        return parse_scipy_optimization_result(optimization_result, self.problem, self.evaluator)

__init__

__init__(
    problem: Problem,
    options: ScipyDeOptions = _default_scipy_de_options,
)

Creates a solver that utilizes the scipy.optimize.differential_evolution routine.

The scipy.optimize.differential_evolution routine is fully accessible through this function. For additional details and explanation of some of the argumetns, see https://docs.scipy.org/doc/scipy/reference/generated/scipy.optimize.differential_evolution.html

Parameters:

Name Type Description Default
problem Problem

the multiobjective optimization problem to be solved.

required
options ScipyDeOptions

Pydantic model containing arguments used by scipy DE solver.

_default_scipy_de_options
Source code in desdeo/tools/scipy_solver_interfaces.py
def __init__(
    self,
    problem: Problem,
    options: ScipyDeOptions = _default_scipy_de_options,
):
    """Creates a solver that utilizes the `scipy.optimize.differential_evolution` routine.

    The `scipy.optimize.differential_evolution` routine is fully accessible through this function.
    For additional details and explanation of some of the argumetns, see
    https://docs.scipy.org/doc/scipy/reference/generated/scipy.optimize.differential_evolution.html

    Args:
        problem (Problem): the multiobjective optimization problem to be solved.
        options (ScipyDeOptions): Pydantic model containing arguments used by scipy DE solver.
    """
    initial_guess = options.initial_guess
    de_kwargs = options.de_kwargs

    if variable_dimension_enumerate(problem) not in SUPPORTED_VAR_DIMENSIONS:
        msg = "ScipyDeSolver only supports scalar variables."
        raise SolverError(msg)

    self.problem = problem
    if de_kwargs is None:
        de_kwargs = {
            "strategy": "best1bin",
            "maxiter": 1000,
            "popsize": 15,
            "tol": 0.01,
            "mutation": (0.5, 1),
            "recombination": 0.7,
            "seed": None,
            "callback": None,
            "disp": False,
            "polish": True,
            "init": "latinhypercube",
            "atol": 0,
            "updating": "deferred",
            "workers": 1,
            "integrality": None,
            "vectorized": True,  # the constraints for scipy_de need to be fixed first for this to work
        }
    self.de_kwargs = de_kwargs

    # variable bounds
    self.bounds = get_variable_bounds_pairs(problem)

    # initial guess. If no guess is present for a variable, said variable's mid point of its
    # lower abd upper bound is used instead
    if initial_guess is None:
        self.initial_guess = set_initial_guess(problem)
    else:
        self.initial_guess = initial_guess

    self.evaluator = PolarsEvaluator(problem)
    self.constraints = (
        create_scipy_object_constraints(self.problem, self.evaluator)
        if self.problem.constraints is not None
        else ()
    )

solve

solve(target: str) -> SolverResults

Solve the problem for a given target.

Parameters:

Name Type Description Default
target str

the symbol of the objective function to be optimized.

required

Returns:

Name Type Description
SolverResults SolverResults

results of the optimization.

Source code in desdeo/tools/scipy_solver_interfaces.py
def solve(self, target: str) -> SolverResults:
    """Solve the problem for a given target.

    Args:
        target (str): the symbol of the objective function to be optimized.

    Returns:
        SolverResults: results of the optimization.
    """
    # If the target is an objective symbol, use its _min form
    objective_symbols = {obj.symbol for obj in self.problem.objectives}
    eval_target = f"{target}_min" if target in objective_symbols else target

    optimization_result: _ScipyOptimizeResult = _scipy_de(
        get_scipy_eval(self.problem, self.evaluator, eval_target, EvalTargetEnum.objective),
        bounds=self.bounds,
        x0=self.initial_guess,
        constraints=self.constraints,
        **self.de_kwargs,
    )

    # parse the results
    return parse_scipy_optimization_result(optimization_result, self.problem, self.evaluator)

ScipyMinimizeOptions

Bases: BaseModel

Defines a pydantic model to store and pass options to the Scipy Minimize solver.

Source code in desdeo/tools/scipy_solver_interfaces.py
class ScipyMinimizeOptions(BaseModel):
    """Defines a pydantic model to store and pass options to the Scipy Minimize solver."""

    initial_guess: dict[str, float | None] | None = Field(
        description="The initial guess to be utilized in the solver. For variables with a None as their"
        "initial guess, the mid-point of the variable's lower and upper bound is utilized as the "
        "initial guess. If None, it is assumed that there are no initial guesses for any of the variables.",
        default=None,
    )
    method: str | None = Field(
        description="The scipy.optimize.minimize method to beused. If None, a method is selected "
        "automatically based on the properties of the objective (does it have constraints?).",
        default=None,
    )
    method_kwargs: dict | None = Field(
        description="The keyword arguments passed to the scipy.optimize.minimize method.", default=None
    )
    tol: float | None = Field(description="Tolerance for termination.", default=None)
    additional_options: dict | None = Field(description="Additional solver options.", default=None)

ScipyMinimizeSolver

Bases: BaseSolver

Creates a scipy solver that utilizes the minimization routine.

Source code in desdeo/tools/scipy_solver_interfaces.py
class ScipyMinimizeSolver(BaseSolver):
    """Creates a scipy solver that utilizes the `minimization` routine."""

    def __init__(self, problem: Problem, options: ScipyMinimizeOptions = _default_scipy_minimize_options):
        """Initializes a solver that utilizes the `scipy.optimize.minimize` routine.

        The `scipy.optimize.minimze` routine is fully accessible through this function.
        For additional details and explanation of some of the argumetns, see
        https://docs.scipy.org/doc/scipy/reference/generated/scipy.optimize.minimize.html#scipy.optimize.minimize

        Args:
            problem (Problem): the multiobjective optimization problem to be solved.
            options: (ScipyMinimizeOptions): Pydantic model containing args for scipy minimize solver.

        """
        if variable_dimension_enumerate(problem) not in SUPPORTED_VAR_DIMENSIONS:
            msg = "ScipyMinimizeSolver only supports scalar variables."
            raise SolverError(msg)

        initial_guess = options.initial_guess
        self.problem = problem
        self.method = options.method
        self.method_kwargs = options.method_kwargs
        self.tol = options.tol
        self.additional_options = options.additional_options

        # variables bounds as (min, max pairs)
        self.bounds = get_variable_bounds_pairs(problem)

        # the initial guess as a simple sequence. If no initial value is set for some variable,
        # then the initial value defaults to middle of the upper and lower bounds.
        if initial_guess is not None:
            self.initial_guess = [initial_guess[var.symbol] for var in self.problem.variables]
        else:
            self.initial_guess = set_initial_guess(problem)

        self.evaluator = PolarsEvaluator(problem)

        self.constraints = (
            create_scipy_dict_constraints(self.problem, self.evaluator)
            if self.problem.constraints is not None
            else None
        )

    def solve(self, target: str) -> SolverResults:
        """Solves the problem for a given target.

        Args:
            target (str): the sumbol of the objective function to be optimized.

        Returns:
            SolverResults: results of the optimization.
        """
        # If the target is an objective symbol, use its _min form
        objective_symbols = {obj.symbol for obj in self.problem.objectives}
        eval_target = f"{target}_min" if target in objective_symbols else target

        optimization_result: _ScipyOptimizeResult = _scipy_minimize(
            get_scipy_eval(self.problem, self.evaluator, eval_target, EvalTargetEnum.objective),
            self.initial_guess,
            method=self.method,
            bounds=self.bounds,
            constraints=self.constraints,
            options=self.additional_options,
            tol=self.tol,
        )

        # pare and return the results
        return parse_scipy_optimization_result(optimization_result, self.problem, self.evaluator)

__init__

__init__(
    problem: Problem,
    options: ScipyMinimizeOptions = _default_scipy_minimize_options,
)

Initializes a solver that utilizes the scipy.optimize.minimize routine.

The scipy.optimize.minimze routine is fully accessible through this function. For additional details and explanation of some of the argumetns, see https://docs.scipy.org/doc/scipy/reference/generated/scipy.optimize.minimize.html#scipy.optimize.minimize

Parameters:

Name Type Description Default
problem Problem

the multiobjective optimization problem to be solved.

required
options ScipyMinimizeOptions

(ScipyMinimizeOptions): Pydantic model containing args for scipy minimize solver.

_default_scipy_minimize_options
Source code in desdeo/tools/scipy_solver_interfaces.py
def __init__(self, problem: Problem, options: ScipyMinimizeOptions = _default_scipy_minimize_options):
    """Initializes a solver that utilizes the `scipy.optimize.minimize` routine.

    The `scipy.optimize.minimze` routine is fully accessible through this function.
    For additional details and explanation of some of the argumetns, see
    https://docs.scipy.org/doc/scipy/reference/generated/scipy.optimize.minimize.html#scipy.optimize.minimize

    Args:
        problem (Problem): the multiobjective optimization problem to be solved.
        options: (ScipyMinimizeOptions): Pydantic model containing args for scipy minimize solver.

    """
    if variable_dimension_enumerate(problem) not in SUPPORTED_VAR_DIMENSIONS:
        msg = "ScipyMinimizeSolver only supports scalar variables."
        raise SolverError(msg)

    initial_guess = options.initial_guess
    self.problem = problem
    self.method = options.method
    self.method_kwargs = options.method_kwargs
    self.tol = options.tol
    self.additional_options = options.additional_options

    # variables bounds as (min, max pairs)
    self.bounds = get_variable_bounds_pairs(problem)

    # the initial guess as a simple sequence. If no initial value is set for some variable,
    # then the initial value defaults to middle of the upper and lower bounds.
    if initial_guess is not None:
        self.initial_guess = [initial_guess[var.symbol] for var in self.problem.variables]
    else:
        self.initial_guess = set_initial_guess(problem)

    self.evaluator = PolarsEvaluator(problem)

    self.constraints = (
        create_scipy_dict_constraints(self.problem, self.evaluator)
        if self.problem.constraints is not None
        else None
    )

solve

solve(target: str) -> SolverResults

Solves the problem for a given target.

Parameters:

Name Type Description Default
target str

the sumbol of the objective function to be optimized.

required

Returns:

Name Type Description
SolverResults SolverResults

results of the optimization.

Source code in desdeo/tools/scipy_solver_interfaces.py
def solve(self, target: str) -> SolverResults:
    """Solves the problem for a given target.

    Args:
        target (str): the sumbol of the objective function to be optimized.

    Returns:
        SolverResults: results of the optimization.
    """
    # If the target is an objective symbol, use its _min form
    objective_symbols = {obj.symbol for obj in self.problem.objectives}
    eval_target = f"{target}_min" if target in objective_symbols else target

    optimization_result: _ScipyOptimizeResult = _scipy_minimize(
        get_scipy_eval(self.problem, self.evaluator, eval_target, EvalTargetEnum.objective),
        self.initial_guess,
        method=self.method,
        bounds=self.bounds,
        constraints=self.constraints,
        options=self.additional_options,
        tol=self.tol,
    )

    # pare and return the results
    return parse_scipy_optimization_result(optimization_result, self.problem, self.evaluator)

create_scipy_dict_constraints

create_scipy_dict_constraints(
    problem: Problem, evaluator: PolarsEvaluator
) -> dict

Creates a dict with scipy compatible constraints.

It is assumed that there are constraints defined in problem.

Parameters:

Name Type Description Default
problem Problem

the Problem with the constraints.

required
evaluator GenericEvaluator

the evaluator utilized to evaluate problem.

required

Returns:

Name Type Description
dict dict

a dict with scipy compatible constraints.

Source code in desdeo/tools/scipy_solver_interfaces.py
def create_scipy_dict_constraints(problem: Problem, evaluator: PolarsEvaluator) -> dict:
    """Creates a dict with scipy compatible constraints.

    It is assumed that there are constraints defined in problem.

    Args:
        problem (Problem): the Problem with the constraints.
        evaluator (GenericEvaluator): the evaluator utilized to evaluate problem.

    Returns:
        dict: a dict with scipy compatible constraints.
    """
    return [
        {
            "type": "ineq" if constraint.cons_type == ConstraintTypeEnum.LTE else "eq",
            "fun": get_scipy_eval(problem, evaluator, constraint.symbol, eval_target=EvalTargetEnum.constraint),
        }
        for constraint in problem.constraints
    ]

create_scipy_object_constraints

create_scipy_object_constraints(
    problem: Problem, evaluator: PolarsEvaluator
) -> list[NonlinearConstraint]

Creates a list with scipy constraint object NonLinearConstraints used by some scipy routines.

For more infor, see https://docs.scipy.org/doc/scipy/reference/generated/scipy.optimize.NonlinearConstraint.html#scipy-optimize-nonlinearconstraint

Parameters:

Name Type Description Default
problem Problem

the problem with the original constraint to be utilized in creating the list of constraints.

required
evaluator GenericEvaluator

the evaluator corresponding to problem that can be used to evaluate the constraints.

required

Returns:

Type Description
list[NonlinearConstraint]

list[NonlinearConstraint]: a list of scipy's NonLinearConstraint objects.

Source code in desdeo/tools/scipy_solver_interfaces.py
def create_scipy_object_constraints(problem: Problem, evaluator: PolarsEvaluator) -> list[NonlinearConstraint]:
    """Creates a list with scipy constraint object `NonLinearConstraints` used by some scipy routines.

    For more infor, see https://docs.scipy.org/doc/scipy/reference/generated/scipy.optimize.NonlinearConstraint.html#scipy-optimize-nonlinearconstraint

    Args:
        problem (Problem): the problem with the original constraint to be utilized in creating the list of constraints.
        evaluator (GenericEvaluator): the evaluator corresponding to problem that can be used to evaluate
            the constraints.

    Returns:
        list[NonlinearConstraint]: a list of scipy's NonLinearConstraint objects.
    """
    return NonlinearConstraint(
        fun=get_scipy_eval(problem, evaluator, "", eval_target=EvalTargetEnum.constraint),
        lb=0,  # constraint value must be between 0 and inf, e.g., positive.
        ub=float("inf"),  # since in scipy, a constraint is respected when its value is positive. See scipy_eval.
    )

get_scipy_eval

get_scipy_eval(
    problem: Problem,
    evaluator: PolarsEvaluator,
    target: str,
    eval_target: EvalTargetEnum,
) -> Callable[[list[float | int]], list[float | int]]

Wraps the problem and evaluator into a callable function that can be used by scipy routines.

The returned function expects an array-like argument, such as a numpy array or list.

Parameters:

Name Type Description Default
problem Problem

the problem being solved.

required
evaluator GenericEvaluator

the evaluator to evaluate the problem being solved.

required
target str

the symbol of the objective to of the optimization, defined in problem.

required
eval_target EvalTargetEnum

either objective or constraints. If objective, it is assumed that the evalution is about evaluating the objective function of the single-objective optimization problem being solved, e.g., a scalarization function defined in problem. If constraint, then the evalution is assumed to be about evaluating the constraints defined in problem.

required

Returns:

Type Description
Callable[[list[float | int]], list[float | int]]

Callable[[list[float | int]], list[float | int]]: a function that takes as its argument an array like object.

Note

Constraints in scipy are defined such that a positive number means the constraint is respected. In DESDEO, this is the opposite, e.g., a positive number means a constraint is breached. We take this into account when returning the constraint values, but this does not affect the constraint values computed for the true constraints.

Source code in desdeo/tools/scipy_solver_interfaces.py
def get_scipy_eval(
    problem: Problem,
    evaluator: PolarsEvaluator,
    target: str,
    eval_target: EvalTargetEnum,
) -> Callable[[list[float | int]], list[float | int]]:
    """Wraps the problem and evaluator into a callable function that can be used by scipy routines.

    The returned function expects an array-like argument, such as a numpy array or list.

    Args:
        problem (Problem): the problem being solved.
        evaluator (GenericEvaluator): the evaluator to evaluate the problem being solved.
        target (str): the symbol of the objective to of the optimization, defined in problem.
        eval_target (EvalTargetEnum): either objective or constraints. If objective,
            it is assumed that the evalution is about evaluating the objective function
            of the single-objective optimization problem being solved, e.g., a scalarization function
            defined in problem. If constraint, then the evalution is assumed to be about evaluating
            the constraints defined in problem.

    Returns:
      Callable[[list[float | int]], list[float | int]]: a function that takes as its argument
        an array like object.

    Note:
        Constraints in scipy are defined such that a positive number means the constraint
            is respected. In DESDEO, this is the opposite, e.g., a positive number means
            a constraint is breached. We take this into account when returning the
            constraint values, but this does not affect the constraint values computed
            for the true constraints.
    """

    def scipy_eval(x: list[float | int]) -> list[float | int]:
        """An evaluator to be used in scipy routines.

        Args:
            x (list[float  |  int]): an array like, such as a numpy array or list.append

        Raises:
            SolverError: when an invalid evaluator target is specified.

        Returns:
            list[float | int]: an array like.
        """
        # TODO: Consider caching the results of evaluator.evaluate
        evalutor_args = {
            problem.variables[i].symbol: [x[i]] if isinstance(x[i], float | int) else x[i]
            for i in range(len(problem.variables))
        }

        if eval_target == EvalTargetEnum.objective:
            evaluator_res = evaluator.evaluate(evalutor_args)

            # evaluata objective (scalarized)
            return evaluator_res.to_dict(as_series=False)[target]

        if eval_target == EvalTargetEnum.constraint:
            evaluator_df = evaluator.evaluate(evalutor_args)
            # evaluate constraint
            # put the minus here because scipy expect positive constraints values when the constraint
            # is respected. But in DESDEO, we define constraints s.t., a negative value means the constraint
            # is recpected, therefore, it needs to be flipped here.
            con_symbols = [constraint.symbol for constraint in problem.constraints]
            res_dict = evaluator_df[con_symbols].to_dict(as_series=False)

            res = np.array([np.array(res_dict[symbol]) for symbol in con_symbols])

            # squeeze important for minimization routines
            return -np.squeeze(res, axis=-1) if res.shape[-1] == 1 else -res

        # non-existing eval_target
        msg = f"'eval_target' = '{eval_target} not supported. Must be one of {list(EvalTargetEnum)}."
        raise SolverError(msg)

    return scipy_eval

get_variable_bounds_pairs

get_variable_bounds_pairs(
    problem: Problem,
) -> list[tuple[float | int, float | int]]

Returns the variable bounds defined in a Problem as a list of tuples.

Parameters:

Name Type Description Default
problem Problem

the problem with the variables of interest.

required

Returns:

Type Description
list[tuple[float | int, float | int]]

list[tuple[float | int, float | int]]: a list of tuples, the first element of each tuple is the lower bound of a variable and the second its upper bound. Each tuple corresponds to a variable.

Source code in desdeo/tools/scipy_solver_interfaces.py
def get_variable_bounds_pairs(problem: Problem) -> list[tuple[float | int, float | int]]:
    """Returns the variable bounds defined in a Problem as a list of tuples.

    Args:
        problem (Problem): the problem with the variables of interest.

    Returns:
        list[tuple[float | int, float | int]]: a list of tuples, the first
            element of each tuple is the lower bound of a variable and the second
            its upper bound. Each tuple corresponds to a variable.
    """
    return [(variable.lowerbound, variable.upperbound) for variable in problem.variables]

parse_scipy_optimization_result

parse_scipy_optimization_result(
    optimization_result: OptimizeResult,
    problem: Problem,
    evaluator: PolarsEvaluator,
) -> SolverResults

Parses the optimization results returned by various scipy methods.

For documentation, see https://docs.scipy.org/doc/scipy/reference/generated/scipy.optimize.OptimizeResult.html#scipy.optimize.OptimizeResult

Parameters:

Name Type Description Default
optimization_result OptimizeResult

the optimization results.

required
problem Problem

the problem to which the optimization results correspond to.

required
evaluator GenericEvaluator

the evaluator that has been used in computing the optimization results.

required

Returns:

Name Type Description
SolverResults SolverResults

a pydantic dataclass with the relevant optimization results.

Source code in desdeo/tools/scipy_solver_interfaces.py
def parse_scipy_optimization_result(
    optimization_result: _ScipyOptimizeResult, problem: Problem, evaluator: PolarsEvaluator
) -> SolverResults:
    """Parses the optimization results returned by various scipy methods.

    For documentation, see https://docs.scipy.org/doc/scipy/reference/generated/scipy.optimize.OptimizeResult.html#scipy.optimize.OptimizeResult

    Args:
        optimization_result (_ScipyOptimizeResult): the optimization results.
        problem (Problem): the problem to which the optimization results correspond to.
        evaluator (GenericEvaluator): the evaluator that has been used in computing the optimization results.

    Returns:
        SolverResults: a pydantic dataclass with the relevant optimization results.
    """
    x_opt = optimization_result.x
    success_opt = optimization_result.success
    msg_opt = optimization_result.message

    results = evaluator.evaluate({problem.variables[i].symbol: [x_opt[i]] for i in range(len(problem.variables))})

    optimal_objectives = {obj.symbol: results[obj.symbol][0] for obj in problem.objectives}
    constraint_values = (
        {con.symbol: results[con.symbol][0] for con in problem.constraints} if problem.constraints is not None else None
    )

    """
    objective_symbols = [obj.symbol for obj in problem.objectives]
    f_res = results[objective_symbols]
    optimal_objectives = {symbol: f_res[symbol][0] for symbol in objective_symbols}

    if problem.constraints is not None:
        const_symbols = [const.symbol for const in problem.constraints]
        const_res = results[const_symbols]
        constraint_values = {symbol: const_res[symbol][0] for symbol in const_symbols}
    else:
        constraint_values = None
    """

    extra_func_values = (
        {extra.symbol: results[extra.symbol][0] for extra in problem.extra_funcs}
        if problem.extra_funcs is not None
        else None
    )
    scalarization_values = (
        {scal.symbol: results[scal.symbol][0] for scal in problem.scalarization_funcs}
        if problem.scalarization_funcs is not None
        else None
    )

    return SolverResults(
        optimal_variables={problem.variables[i].symbol: x_opt[i] for i in range(len(problem.variables))},
        optimal_objectives=optimal_objectives,
        constraint_values=constraint_values,
        extra_func_values=extra_func_values,
        scalarization_values=scalarization_values,
        success=success_opt,
        message=msg_opt,
    )

set_initial_guess

set_initial_guess(problem: Problem) -> list[float | int]

Sets or gets the initial guess for each variable defined in a Problem.

For variables without initial guess, the initial guess is set to be the middle point of said variable's lower and upper bound.

Parameters:

Name Type Description Default
problem Problem

the problem with the variables of which the initial values are of interest.

required

Returns:

Type Description
list[float | int]

list[float | int]: a list of numbers, each number represents the initial guess of each variable in the problem.

Source code in desdeo/tools/scipy_solver_interfaces.py
def set_initial_guess(problem: Problem) -> list[float | int]:
    """Sets or gets the initial guess for each variable defined in a Problem.

    For variables without initial guess, the initial guess is set to be the middle point of said
    variable's lower and upper bound.

    Args:
        problem (Problem): the problem with the variables of which the initial values are of interest.

    Returns:
        list[float | int]: a list of numbers, each number represents the initial guess of each variable in the problem.
    """
    return [
        variable.initial_value
        if variable.initial_value is not None
        else ((variable.upperbound - variable.lowerbound) / 2 + variable.lowerbound)
        for variable in problem.variables
    ]

Pyomo solver interfaces

Defines solver interfaces for pyomo.

_default_bonmin_options module-attribute

_default_bonmin_options = BonminOptions()

Defines Bonmin options with default values.

_default_cbc_options module-attribute

_default_cbc_options = CbcOptions()

Defines CBC options with default values.

_default_ipopt_options module-attribute

_default_ipopt_options = IpoptOptions()

Defines Ipopt optins with default values.

BonminOptions

Bases: BaseModel

Defines a pydantic model to store and pass options to the Bonmin solver.

Because Bonmin utilizes many sub-solver, the options specific to Bonmin must be prefixed in their name with 'bonmin.{option_name}', e.g., bonmin.integer_tolerance. For a list of options, see https://www.coin-or.org/Bonmin/options_list.html

Note

Not all options are available through this model. Please add options as they are needed and make a pull request.

Source code in desdeo/tools/pyomo_solver_interfaces.py
class BonminOptions(BaseModel):
    """Defines a pydantic model to store and pass options to the Bonmin solver.

    Because Bonmin utilizes many sub-solver, the options specific to Bonmin
    must be prefixed in their name with 'bonmin.{option_name}',
    e.g., `bonmin.integer_tolerance`. For a list of options, see
    https://www.coin-or.org/Bonmin/options_list.html

    Note:
        Not all options are available through this model.
        Please add options as they are needed and make a pull request.
    """

    tol: float = Field(description="Sets the convergence tolerance of ipopt. Defaults to 1e-8.", default=1e-8)
    """Sets the convergence tolerance of ipopt. Defaults to 1e-8."""

    bonmin_integer_tolerance: float = Field(
        description="Numbers within this value of an integer are considered integers. Defaults to 1e-6.", default=1e-6
    )
    """Numbers within this value of an integer are considered integers. Defaults to 1e-6."""

    bonmin_algorithm: str = Field(
        description=(
            "Presets some of the options in Bonmin based on the algorithm choice. Defaults to 'B-BB'. "
            "A good first option to try is 'B-Hyb'."
        ),
        default="B-BB",
    )
    """Presets some of the options in Bonmin based on the algorithm choice. Defaults to 'B-BB'.
    A good first option to try is 'B-Hyb'.
    """

    def asdict(self) -> dict[str, float]:
        """Converts the Pydantic model into a dict so that Bonmin specific options are in the correct format.

        This means that the attributes starting with `bonmin_optionname` will be
        converted to keys in the format `bonmin.optionname` in the returned dict.
        """
        output = {}
        for field_name, _ in BonminOptions.model_fields.items():
            if (rest := field_name.split(sep="_"))[0] == "bonmin":
                # Convert to Bonmin specific format
                output[f"bonmin.{'_'.join(rest[1:])}"] = getattr(self, field_name)
            else:
                # Keep the field as is
                output[field_name] = getattr(self, field_name)

        return output

bonmin_algorithm class-attribute instance-attribute

bonmin_algorithm: str = Field(
    description="Presets some of the options in Bonmin based on the algorithm choice. Defaults to 'B-BB'. A good first option to try is 'B-Hyb'.",
    default="B-BB",
)

Presets some of the options in Bonmin based on the algorithm choice. Defaults to 'B-BB'. A good first option to try is 'B-Hyb'.

bonmin_integer_tolerance class-attribute instance-attribute

bonmin_integer_tolerance: float = Field(
    description="Numbers within this value of an integer are considered integers. Defaults to 1e-6.",
    default=1e-06,
)

Numbers within this value of an integer are considered integers. Defaults to 1e-6.

tol class-attribute instance-attribute

tol: float = Field(
    description="Sets the convergence tolerance of ipopt. Defaults to 1e-8.",
    default=1e-08,
)

Sets the convergence tolerance of ipopt. Defaults to 1e-8.

asdict

asdict() -> dict[str, float]

Converts the Pydantic model into a dict so that Bonmin specific options are in the correct format.

This means that the attributes starting with bonmin_optionname will be converted to keys in the format bonmin.optionname in the returned dict.

Source code in desdeo/tools/pyomo_solver_interfaces.py
def asdict(self) -> dict[str, float]:
    """Converts the Pydantic model into a dict so that Bonmin specific options are in the correct format.

    This means that the attributes starting with `bonmin_optionname` will be
    converted to keys in the format `bonmin.optionname` in the returned dict.
    """
    output = {}
    for field_name, _ in BonminOptions.model_fields.items():
        if (rest := field_name.split(sep="_"))[0] == "bonmin":
            # Convert to Bonmin specific format
            output[f"bonmin.{'_'.join(rest[1:])}"] = getattr(self, field_name)
        else:
            # Keep the field as is
            output[field_name] = getattr(self, field_name)

    return output

CbcOptions

Bases: BaseModel

Defines a pydantic dataclass to pass options to the CBC solver.

For more information and documentation on the options, see https://github.com/coin-or/Cbc

Note

Not all options are available through this model. Please add options as they are needed and make a pull request.

Source code in desdeo/tools/pyomo_solver_interfaces.py
class CbcOptions(BaseModel):
    """Defines a pydantic dataclass to pass options to the CBC solver.

    For more information and documentation on the options,
    see https://github.com/coin-or/Cbc

    Note:
        Not all options are available through this model.
        Please add options as they are needed and make a pull request.
    """

    model_config = ConfigDict(frozen=True, populate_by_name=True)

    seconds: int = Field(
        description="The maximum amount of time (in seconds) the solver should run. Defaults to 600.", default=600
    )
    """The maximum amount of time (in seconds) the solver should run. Defaults to 600."""

    threads: int = Field(
        description="Number of threads (cores) to use for solving the problem. Defaults to 4.", default=4
    )
    """Number of threads (cores) to use for solving the problem. Defaults to 4."""

    log_level: int = Field(
        alias="logLevel",
        description=(
            "Controls the level of logging output. Values range from 0 (no output) to 5 (very detailed output)."
            " Defaults to 2."
        ),
        default=2,
    )
    """Controls the level of logging output. Values range from 0 (no output) to 5 (very detailed output).
    Defaults to 2.
    """

    max_solutions: int = Field(
        alias="maxSolutions",
        description="Limits the number of feasible solutions found by the solver. Defaults to 10.",
        default=10,
    )
    """Limits the number of feasible solutions found by the solver. Defaults to 10."""

    max_nodes: int = Field(
        alias="maxNodes",
        description="Sets the maximum number of branch-and-bound nodes to explore. Defaults to 1000.",
        default=1000,
    )
    """Sets the maximum number of branch-and-bound nodes to explore. Defaults to 1000."""

    ratio_gap: float = Field(
        alias="ratioGap",
        description=(
            "Sets the relative MIP gap (as a fraction of the optimal solution value) at which the solver will"
            " terminate. Defaults to 0.01."
        ),
        default=0.01,
    )
    """Sets the relative MIP gap (as a fraction of the optimal solution value) at which the solver will terminate.
    Defaults to 0.01.
    """

    absolute_gap: float = Field(
        alias="absoluteGap",
        description=(
            "Sets the absolute MIP gap (an absolute value) at which the solver will terminate.  Defaults to 1.0."
        ),
        default=1.0,
    )
    """Sets the absolute MIP gap (an absolute value) at which the solver will terminate. Defaults to 1.0."""

    solve: str = Field(
        description=(
            "Determines the strategy to use for solving the problem (e.g., 'branchAndCut', 'tree', 'trunk')."
            " Defaults to 'branchAndCut'."
        ),
        default="branchAndCut",
    )
    """Determines the strategy to use for solving the problem (e.g., 'branchAndCut', 'tree', 'trunk').
    Defaults to 'branchAndCut'.
    """

    presolve: int = Field(
        description="Controls the presolve level (0: no presolve, 1: default, 2: aggressive). Defaults to 2.", default=2
    )
    """Controls the presolve level (0: no presolve, 1: default, 2: aggressive). Defaults to 2."""

    feasibility_tolerance: float = Field(
        alias="feasibilityTolerance",
        description="Sets the feasibility tolerance for constraints. Defaults to 1e-6.",
        default=1e-6,
    )
    """Sets the feasibility tolerance for constraints. Defaults to 1e-6."""

    integer_tolerance: float = Field(
        alias="integerTolerance",
        description="Sets the tolerance for integrality of integer variables. Defaults to 1e-5.",
        default=1e-5,
    )
    """Sets the tolerance for integrality of integer variables. Defaults to 1e-5."""

absolute_gap class-attribute instance-attribute

absolute_gap: float = Field(
    alias="absoluteGap",
    description="Sets the absolute MIP gap (an absolute value) at which the solver will terminate.  Defaults to 1.0.",
    default=1.0,
)

Sets the absolute MIP gap (an absolute value) at which the solver will terminate. Defaults to 1.0.

feasibility_tolerance class-attribute instance-attribute

feasibility_tolerance: float = Field(
    alias="feasibilityTolerance",
    description="Sets the feasibility tolerance for constraints. Defaults to 1e-6.",
    default=1e-06,
)

Sets the feasibility tolerance for constraints. Defaults to 1e-6.

integer_tolerance class-attribute instance-attribute

integer_tolerance: float = Field(
    alias="integerTolerance",
    description="Sets the tolerance for integrality of integer variables. Defaults to 1e-5.",
    default=1e-05,
)

Sets the tolerance for integrality of integer variables. Defaults to 1e-5.

log_level class-attribute instance-attribute

log_level: int = Field(
    alias="logLevel",
    description="Controls the level of logging output. Values range from 0 (no output) to 5 (very detailed output). Defaults to 2.",
    default=2,
)

Controls the level of logging output. Values range from 0 (no output) to 5 (very detailed output). Defaults to 2.

max_nodes class-attribute instance-attribute

max_nodes: int = Field(
    alias="maxNodes",
    description="Sets the maximum number of branch-and-bound nodes to explore. Defaults to 1000.",
    default=1000,
)

Sets the maximum number of branch-and-bound nodes to explore. Defaults to 1000.

max_solutions class-attribute instance-attribute

max_solutions: int = Field(
    alias="maxSolutions",
    description="Limits the number of feasible solutions found by the solver. Defaults to 10.",
    default=10,
)

Limits the number of feasible solutions found by the solver. Defaults to 10.

presolve class-attribute instance-attribute

presolve: int = Field(
    description="Controls the presolve level (0: no presolve, 1: default, 2: aggressive). Defaults to 2.",
    default=2,
)

Controls the presolve level (0: no presolve, 1: default, 2: aggressive). Defaults to 2.

ratio_gap class-attribute instance-attribute

ratio_gap: float = Field(
    alias="ratioGap",
    description="Sets the relative MIP gap (as a fraction of the optimal solution value) at which the solver will terminate. Defaults to 0.01.",
    default=0.01,
)

Sets the relative MIP gap (as a fraction of the optimal solution value) at which the solver will terminate. Defaults to 0.01.

seconds class-attribute instance-attribute

seconds: int = Field(
    description="The maximum amount of time (in seconds) the solver should run. Defaults to 600.",
    default=600,
)

The maximum amount of time (in seconds) the solver should run. Defaults to 600.

solve class-attribute instance-attribute

solve: str = Field(
    description="Determines the strategy to use for solving the problem (e.g., 'branchAndCut', 'tree', 'trunk'). Defaults to 'branchAndCut'.",
    default="branchAndCut",
)

Determines the strategy to use for solving the problem (e.g., 'branchAndCut', 'tree', 'trunk'). Defaults to 'branchAndCut'.

threads class-attribute instance-attribute

threads: int = Field(
    description="Number of threads (cores) to use for solving the problem. Defaults to 4.",
    default=4,
)

Number of threads (cores) to use for solving the problem. Defaults to 4.

IpoptOptions

Bases: BaseModel

Defines a pydantic dataclass to pass options to the Ipopt solver.

For more information and documentation on the options, see https://coin-or.github.io/Ipopt/

Note

Not all options are available through this model. Please add options as they are needed and make a pull request.

Source code in desdeo/tools/pyomo_solver_interfaces.py
class IpoptOptions(BaseModel):
    """Defines a pydantic dataclass to pass options to the Ipopt solver.

    For more information and documentation on the options,
    see https://coin-or.github.io/Ipopt/

    Note:
        Not all options are available through this model.
        Please add options as they are needed and make a pull request.
    """

    tol: float = Field(description="The desired relative convergence tolerance. Defaults to 1e-8.", default=1e-8)
    """The desired relative convergence tolerance. Defaults to 1e-8."""

    max_iter: int = Field(description="Maximum number of iterations. Must be >1. Defaults to 3000.", default=3000)
    """Maximum number of iterations. Must be >1. Defaults to 3000."""

    print_level: int = Field(
        description="The verbosity level of the solver's output. Ranges between 0 and 12. Defaults to 5.", default=5
    )
    """The verbosity level of the solver's output. Ranges between 0 and 12."""

max_iter class-attribute instance-attribute

max_iter: int = Field(
    description="Maximum number of iterations. Must be >1. Defaults to 3000.",
    default=3000,
)

Maximum number of iterations. Must be >1. Defaults to 3000.

print_level class-attribute instance-attribute

print_level: int = Field(
    description="The verbosity level of the solver's output. Ranges between 0 and 12. Defaults to 5.",
    default=5,
)

The verbosity level of the solver's output. Ranges between 0 and 12.

tol class-attribute instance-attribute

tol: float = Field(
    description="The desired relative convergence tolerance. Defaults to 1e-8.",
    default=1e-08,
)

The desired relative convergence tolerance. Defaults to 1e-8.

PyomoBonminSolver

Bases: BaseSolver

Creates pyomo solvers that utilize bonmin.

Source code in desdeo/tools/pyomo_solver_interfaces.py
class PyomoBonminSolver(BaseSolver):
    """Creates pyomo solvers that utilize bonmin."""

    def __init__(self, problem: Problem, options: BonminOptions | None = _default_bonmin_options):
        """The solver is initialized with a problem and solver options.

        Suitable for mixed-integer problems. The objective function being minimized
        (target) and the constraint functions must be twice continuously
        differentiable. When the objective functions and constraints are convex, the
        solution is exact. When the objective or any of the constraints, or both,
        are non-convex, then the solution is based on heuristics.

        For more info about bonmin, see: https://www.coin-or.org/Bonmin/

        Note:
            Bonmin must be installed on the system running DESDEO, and its executable
                must be defined in the PATH.

        Args:
            problem (Problem): the problem to be solved.
            options (BonminOptions, optional): options to be passed to the Bonmin solver.
                If `None` is passed, defaults to `_default_bonmin_options` defined in
                this source file. Defaults to `None`.
        """
        if not problem.is_twice_differentiable:
            raise SolverError("Problem must be twice differentiable.")
        self.problem = problem
        self.evaluator = PyomoEvaluator(problem)

        if options is None:
            self.options = _default_bonmin_options
        else:
            self.options = options

        # Add suffix to request dual values from Bonmin
        self.evaluator.model.dual = pyomo.Suffix(direction=pyomo.Suffix.IMPORT)

    def solve(self, target: str) -> SolverResults:
        """Solve the problem for a given target.

        Args:
            target (str): the symbol of the objective function to be optimized.

        Returns:
            SolverResults: the results of the optimization.
        """
        self.evaluator.set_optimization_target(target)

        opt = pyomo.SolverFactory("bonmin", tee=True)

        # set solver options
        for key, value in self.options.asdict().items():
            opt.options[key] = value
        opt_res = opt.solve(self.evaluator.model)

        return parse_pyomo_optimizer_results(opt_res, self.problem, self.evaluator)

__init__

__init__(
    problem: Problem,
    options: BonminOptions | None = _default_bonmin_options,
)

The solver is initialized with a problem and solver options.

Suitable for mixed-integer problems. The objective function being minimized (target) and the constraint functions must be twice continuously differentiable. When the objective functions and constraints are convex, the solution is exact. When the objective or any of the constraints, or both, are non-convex, then the solution is based on heuristics.

For more info about bonmin, see: https://www.coin-or.org/Bonmin/

Note

Bonmin must be installed on the system running DESDEO, and its executable must be defined in the PATH.

Parameters:

Name Type Description Default
problem Problem

the problem to be solved.

required
options BonminOptions

options to be passed to the Bonmin solver. If None is passed, defaults to _default_bonmin_options defined in this source file. Defaults to None.

_default_bonmin_options
Source code in desdeo/tools/pyomo_solver_interfaces.py
def __init__(self, problem: Problem, options: BonminOptions | None = _default_bonmin_options):
    """The solver is initialized with a problem and solver options.

    Suitable for mixed-integer problems. The objective function being minimized
    (target) and the constraint functions must be twice continuously
    differentiable. When the objective functions and constraints are convex, the
    solution is exact. When the objective or any of the constraints, or both,
    are non-convex, then the solution is based on heuristics.

    For more info about bonmin, see: https://www.coin-or.org/Bonmin/

    Note:
        Bonmin must be installed on the system running DESDEO, and its executable
            must be defined in the PATH.

    Args:
        problem (Problem): the problem to be solved.
        options (BonminOptions, optional): options to be passed to the Bonmin solver.
            If `None` is passed, defaults to `_default_bonmin_options` defined in
            this source file. Defaults to `None`.
    """
    if not problem.is_twice_differentiable:
        raise SolverError("Problem must be twice differentiable.")
    self.problem = problem
    self.evaluator = PyomoEvaluator(problem)

    if options is None:
        self.options = _default_bonmin_options
    else:
        self.options = options

    # Add suffix to request dual values from Bonmin
    self.evaluator.model.dual = pyomo.Suffix(direction=pyomo.Suffix.IMPORT)

solve

solve(target: str) -> SolverResults

Solve the problem for a given target.

Parameters:

Name Type Description Default
target str

the symbol of the objective function to be optimized.

required

Returns:

Name Type Description
SolverResults SolverResults

the results of the optimization.

Source code in desdeo/tools/pyomo_solver_interfaces.py
def solve(self, target: str) -> SolverResults:
    """Solve the problem for a given target.

    Args:
        target (str): the symbol of the objective function to be optimized.

    Returns:
        SolverResults: the results of the optimization.
    """
    self.evaluator.set_optimization_target(target)

    opt = pyomo.SolverFactory("bonmin", tee=True)

    # set solver options
    for key, value in self.options.asdict().items():
        opt.options[key] = value
    opt_res = opt.solve(self.evaluator.model)

    return parse_pyomo_optimizer_results(opt_res, self.problem, self.evaluator)

PyomoCBCSolver

Bases: BaseSolver

Create a pyomo solver that utilizes CBC.

Source code in desdeo/tools/pyomo_solver_interfaces.py
class PyomoCBCSolver(BaseSolver):
    """Create a pyomo solver that utilizes CBC."""

    def __init__(self, problem: Problem, options: CbcOptions | None = _default_cbc_options):
        """The solver is initialized with a problem and solver options.

        Suitable for combinatorial and large-scale mixed-integer linear problems.

        For more information, see https://coin-or.github.io/Ipopt/

        Note:
            CBC must be installed on the system running DESDEO, and its executable
                must be defined in the PATH.

        Args:
            problem (Problem): the problem being solved.
            options (CbcOptions, optional): options to be passed to the CBC solver.
                If `None` is passed, defaults to `_default_cbc_options` defined in
                this source file. Defaults to `None`.
        """
        if not problem.is_linear:
            raise SolverError("Nonlinear problems not supported.")
        self.problem = problem
        self.evaluator = PyomoEvaluator(problem)

        if options is None:
            self.options = _default_cbc_options
        else:
            self.options = options

    def solve(self, target: str) -> SolverResults:
        """Solve the problem for a given target.

        Args:
            target (str): the symbol of the objective function to be optimized.

        Returns:
            SolverResults: results of the Optimization.
        """
        self.evaluator.set_optimization_target(target)

        opt = pyomo.SolverFactory("cbc", tee=True, options=self.options.model_dump())
        opt_res = opt.solve(self.evaluator.model)
        return parse_pyomo_optimizer_results(opt_res, self.problem, self.evaluator)

__init__

__init__(
    problem: Problem,
    options: CbcOptions | None = _default_cbc_options,
)

The solver is initialized with a problem and solver options.

Suitable for combinatorial and large-scale mixed-integer linear problems.

For more information, see https://coin-or.github.io/Ipopt/

Note

CBC must be installed on the system running DESDEO, and its executable must be defined in the PATH.

Parameters:

Name Type Description Default
problem Problem

the problem being solved.

required
options CbcOptions

options to be passed to the CBC solver. If None is passed, defaults to _default_cbc_options defined in this source file. Defaults to None.

_default_cbc_options
Source code in desdeo/tools/pyomo_solver_interfaces.py
def __init__(self, problem: Problem, options: CbcOptions | None = _default_cbc_options):
    """The solver is initialized with a problem and solver options.

    Suitable for combinatorial and large-scale mixed-integer linear problems.

    For more information, see https://coin-or.github.io/Ipopt/

    Note:
        CBC must be installed on the system running DESDEO, and its executable
            must be defined in the PATH.

    Args:
        problem (Problem): the problem being solved.
        options (CbcOptions, optional): options to be passed to the CBC solver.
            If `None` is passed, defaults to `_default_cbc_options` defined in
            this source file. Defaults to `None`.
    """
    if not problem.is_linear:
        raise SolverError("Nonlinear problems not supported.")
    self.problem = problem
    self.evaluator = PyomoEvaluator(problem)

    if options is None:
        self.options = _default_cbc_options
    else:
        self.options = options

solve

solve(target: str) -> SolverResults

Solve the problem for a given target.

Parameters:

Name Type Description Default
target str

the symbol of the objective function to be optimized.

required

Returns:

Name Type Description
SolverResults SolverResults

results of the Optimization.

Source code in desdeo/tools/pyomo_solver_interfaces.py
def solve(self, target: str) -> SolverResults:
    """Solve the problem for a given target.

    Args:
        target (str): the symbol of the objective function to be optimized.

    Returns:
        SolverResults: results of the Optimization.
    """
    self.evaluator.set_optimization_target(target)

    opt = pyomo.SolverFactory("cbc", tee=True, options=self.options.model_dump())
    opt_res = opt.solve(self.evaluator.model)
    return parse_pyomo_optimizer_results(opt_res, self.problem, self.evaluator)

PyomoGurobiSolver

Bases: BaseSolver

Creates a pyomo solver that utilized Gurobi.

Source code in desdeo/tools/pyomo_solver_interfaces.py
class PyomoGurobiSolver(BaseSolver):
    """Creates a pyomo solver that utilized Gurobi."""

    def __init__(self, problem: Problem, options: dict[str, any] | None = None):
        """Creates a pyomo solver that utilizes gurobi.

        You need to have gurobi installed on your system for this to work.

        Suitable for solving mixed-integer linear and quadratic optimization
        problems.

        Args:
            problem (Problem): the problem to be solved.
            options (GurobiOptions): Dictionary of Gurobi parameters to set.
                This is passed to pyomo as is, so it works the same as options
                would for calling pyomo SolverFactory directly.
                See https://www.gurobi.com/documentation/current/refman/parameters.html
                for information on the available options
        """
        self.problem = problem
        self.evaluator = PyomoEvaluator(problem)

        if options is None:
            self.options = {}
        else:
            self.options = options

    def solve(self, target: str) -> SolverResults:
        """Solve the problem for a given target.

        Args:
            target (str): the symbol of the objective function to be optimized.

        Returns:
            SolverResults: the results of the optimization.
        """
        self.evaluator.set_optimization_target(target)

        with pyomo.SolverFactory("gurobi", solver_io="python") as opt:
            opt_res = opt.solve(self.evaluator.model)
            return parse_pyomo_optimizer_results(opt_res, self.problem, self.evaluator)

__init__

__init__(
    problem: Problem, options: dict[str, any] | None = None
)

Creates a pyomo solver that utilizes gurobi.

You need to have gurobi installed on your system for this to work.

Suitable for solving mixed-integer linear and quadratic optimization problems.

Parameters:

Name Type Description Default
problem Problem

the problem to be solved.

required
options GurobiOptions

Dictionary of Gurobi parameters to set. This is passed to pyomo as is, so it works the same as options would for calling pyomo SolverFactory directly. See https://www.gurobi.com/documentation/current/refman/parameters.html for information on the available options

None
Source code in desdeo/tools/pyomo_solver_interfaces.py
def __init__(self, problem: Problem, options: dict[str, any] | None = None):
    """Creates a pyomo solver that utilizes gurobi.

    You need to have gurobi installed on your system for this to work.

    Suitable for solving mixed-integer linear and quadratic optimization
    problems.

    Args:
        problem (Problem): the problem to be solved.
        options (GurobiOptions): Dictionary of Gurobi parameters to set.
            This is passed to pyomo as is, so it works the same as options
            would for calling pyomo SolverFactory directly.
            See https://www.gurobi.com/documentation/current/refman/parameters.html
            for information on the available options
    """
    self.problem = problem
    self.evaluator = PyomoEvaluator(problem)

    if options is None:
        self.options = {}
    else:
        self.options = options

solve

solve(target: str) -> SolverResults

Solve the problem for a given target.

Parameters:

Name Type Description Default
target str

the symbol of the objective function to be optimized.

required

Returns:

Name Type Description
SolverResults SolverResults

the results of the optimization.

Source code in desdeo/tools/pyomo_solver_interfaces.py
def solve(self, target: str) -> SolverResults:
    """Solve the problem for a given target.

    Args:
        target (str): the symbol of the objective function to be optimized.

    Returns:
        SolverResults: the results of the optimization.
    """
    self.evaluator.set_optimization_target(target)

    with pyomo.SolverFactory("gurobi", solver_io="python") as opt:
        opt_res = opt.solve(self.evaluator.model)
        return parse_pyomo_optimizer_results(opt_res, self.problem, self.evaluator)

PyomoIpoptSolver

Bases: BaseSolver

Create a pyomo solver that utilizes Ipopt.

Source code in desdeo/tools/pyomo_solver_interfaces.py
class PyomoIpoptSolver(BaseSolver):
    """Create a pyomo solver that utilizes Ipopt."""

    def __init__(self, problem: Problem, options: IpoptOptions | None = _default_ipopt_options):
        """The solver is initialized with a problem and solver options.

        Suitable for non-linear, twice differentiable constrained problems.
        The problem may be convex or non-convex.

        For more information, see https://coin-or.github.io/Ipopt/

        Note:
            Ipopt must be installed on the system running DESDEO, and its executable
                must be defined in the PATH.

        Args:
            problem (Problem): the problem being solved.
            options (IpoptOptions, optional): options to be passed to the Ipopt solver.
                If `None` is passed, defaults to `_default_ipopt_options` defined in
                this source file. Defaults to `None`.
        """
        if not problem.is_twice_differentiable:
            raise SolverError("Problem must be twice differentiable.")
        self.problem = problem
        self.evaluator = PyomoEvaluator(problem)

        if options is None:
            self.options = _default_ipopt_options
        else:
            self.options = options

        # Add suffix to request dual values from Ipopt
        self.evaluator.model.dual = pyomo.Suffix(direction=pyomo.Suffix.IMPORT)

    def solve(self, target: str) -> SolverResults:
        """Solve the problem for a given target.

        Args:
            target (str): the symbol of the objective function to be optimized.

        Returns:
            SolverResults: results of the Optimization.
        """
        self.evaluator.set_optimization_target(target)

        opt = pyomo.SolverFactory("ipopt", tee=True, options=self.options.model_dump())
        opt_res = opt.solve(self.evaluator.model)
        return parse_pyomo_optimizer_results(opt_res, self.problem, self.evaluator)

__init__

__init__(
    problem: Problem,
    options: IpoptOptions | None = _default_ipopt_options,
)

The solver is initialized with a problem and solver options.

Suitable for non-linear, twice differentiable constrained problems. The problem may be convex or non-convex.

For more information, see https://coin-or.github.io/Ipopt/

Note

Ipopt must be installed on the system running DESDEO, and its executable must be defined in the PATH.

Parameters:

Name Type Description Default
problem Problem

the problem being solved.

required
options IpoptOptions

options to be passed to the Ipopt solver. If None is passed, defaults to _default_ipopt_options defined in this source file. Defaults to None.

_default_ipopt_options
Source code in desdeo/tools/pyomo_solver_interfaces.py
def __init__(self, problem: Problem, options: IpoptOptions | None = _default_ipopt_options):
    """The solver is initialized with a problem and solver options.

    Suitable for non-linear, twice differentiable constrained problems.
    The problem may be convex or non-convex.

    For more information, see https://coin-or.github.io/Ipopt/

    Note:
        Ipopt must be installed on the system running DESDEO, and its executable
            must be defined in the PATH.

    Args:
        problem (Problem): the problem being solved.
        options (IpoptOptions, optional): options to be passed to the Ipopt solver.
            If `None` is passed, defaults to `_default_ipopt_options` defined in
            this source file. Defaults to `None`.
    """
    if not problem.is_twice_differentiable:
        raise SolverError("Problem must be twice differentiable.")
    self.problem = problem
    self.evaluator = PyomoEvaluator(problem)

    if options is None:
        self.options = _default_ipopt_options
    else:
        self.options = options

    # Add suffix to request dual values from Ipopt
    self.evaluator.model.dual = pyomo.Suffix(direction=pyomo.Suffix.IMPORT)

solve

solve(target: str) -> SolverResults

Solve the problem for a given target.

Parameters:

Name Type Description Default
target str

the symbol of the objective function to be optimized.

required

Returns:

Name Type Description
SolverResults SolverResults

results of the Optimization.

Source code in desdeo/tools/pyomo_solver_interfaces.py
def solve(self, target: str) -> SolverResults:
    """Solve the problem for a given target.

    Args:
        target (str): the symbol of the objective function to be optimized.

    Returns:
        SolverResults: results of the Optimization.
    """
    self.evaluator.set_optimization_target(target)

    opt = pyomo.SolverFactory("ipopt", tee=True, options=self.options.model_dump())
    opt_res = opt.solve(self.evaluator.model)
    return parse_pyomo_optimizer_results(opt_res, self.problem, self.evaluator)

parse_pyomo_optimizer_results

parse_pyomo_optimizer_results(
    opt_res: SolverResults,
    problem: Problem,
    evaluator: PyomoEvaluator,
) -> SolverResults

Parses pyomo SolverResults into DESDEO SolverResults.

Parameters:

Name Type Description Default
opt_res SolverResults

the pyomo solver results.

required
problem Problem

the problem being solved.

required
evaluator PyomoEvaluator

the evaluator utilized to get the pyomo solver results.

required

Returns:

Name Type Description
SolverResults SolverResults

DESDEO solver results.

Source code in desdeo/tools/pyomo_solver_interfaces.py
def parse_pyomo_optimizer_results(
    opt_res: _pyomo_SolverResults, problem: Problem, evaluator: PyomoEvaluator
) -> SolverResults:
    """Parses pyomo SolverResults into DESDEO SolverResults.

    Args:
        opt_res (SolverResults): the pyomo solver results.
        problem (Problem): the problem being solved.
        evaluator (PyomoEvaluator): the evaluator utilized to get the pyomo solver results.

    Returns:
        SolverResults: DESDEO solver results.
    """
    results = evaluator.get_values()

    variable_values = {}
    for var in problem.variables:
        if isinstance(var, TensorVariable):
            # handle tensor variables
            # 1-indexing in Pyomo...
            values_list = np.zeros(var.shape)
            for indices in itertools.product(*(range(1, dim + 1) for dim in var.shape)):
                values_list[*[idx - 1 for idx in indices]] = results[var.symbol][
                    indices if len(indices) > 1 else indices[0]
                ]
            variable_values[var.symbol] = values_list.tolist()
        else:
            # variable_values = {var.symbol: results[var.symbol] for var in problem.variables}
            variable_values[var.symbol] = results[var.symbol]

    objective_values = {obj.symbol: results[obj.symbol] for obj in problem.objectives}
    constraint_values = (
        {con.symbol: results[con.symbol] for con in problem.constraints} if problem.constraints else None
    )

    # handle constraint, which might be multi-valued
    if problem.constraints is not None:
        constraint_values = {}

        for con in problem.constraints:
            result = results[con.symbol]

            if isinstance(result, dict):
                # multi-valued
                indices = list(getattr(evaluator.model, con.symbol).keys())
                shape = tuple(len({idx[k] for idx in indices}) for k in range(len(indices[0])))
                values_list = np.zeros(shape)

                for idx in indices:
                    values_list[*[i - 1 for i in idx]] = result[idx]

                constraint_values[con.symbol] = values_list.tolist()

            else:
                # scalar-valued
                constraint_values[con.symbol] = result
    else:
        constraint_values = None

    extra_func_values = (
        {extra.symbol: results[extra.symbol] for extra in problem.extra_funcs}
        if problem.extra_funcs is not None
        else None
    )
    scalarization_values = (
        {scal.symbol: results[scal.symbol] for scal in problem.scalarization_funcs}
        if problem.scalarization_funcs is not None
        else None
    )
    # Extract Lagrange multipliers with error handling
    lagrange_multipliers = None
    if problem.constraints:
        try:
            if hasattr(evaluator.model, "dual") and evaluator.model.dual:
                lagrange_multipliers = {}
                for con in problem.constraints:
                    try:
                        constraint_obj = getattr(evaluator.model, con.symbol)
                        if constraint_obj in evaluator.model.dual:
                            lagrange_multipliers["mu_" + con.symbol] = evaluator.model.dual[constraint_obj]
                    except (AttributeError, KeyError):
                        continue
                if not lagrange_multipliers:
                    lagrange_multipliers = None
        except (AttributeError, KeyError):
            lagrange_multipliers = None

    success = (
        opt_res.solver.status == _pyomo_SolverStatus.ok
        and opt_res.solver.termination_condition == _pyomo_TerminationCondition.optimal
    )
    msg = (
        f"Pyomo solver status is: '{opt_res.solver.status}', with termination condition: "
        f"'{opt_res.solver.termination_condition}'."
    )

    return SolverResults(
        optimal_variables=variable_values,
        optimal_objectives=objective_values,
        constraint_values=constraint_values,
        extra_func_values=extra_func_values,
        scalarization_values=scalarization_values,
        lagrange_multipliers=lagrange_multipliers,
        success=success,
        message=msg,
    )

Proximal solver

Defines solvers meant to be utilized with Problems with discrete representations.

ProximalSolver

Bases: BaseSolver

Creates a solver that finds the closest solution given a fully discrete problem.

Note

This solver is extremely naive. It will optimize the problem and the result will be a point defined for a discrete problem that is closest (Euclidean distance) to the optimum. The result may be wildly inaccurate depending on how representative the discrete points are of the original problem.

Source code in desdeo/tools/proximal_solver.py
class ProximalSolver(BaseSolver):
    """Creates a solver that finds the closest solution given a fully discrete problem.

    Note:
        This solver is extremely naive. It will optimize the problem and the result will
            be a point defined for a discrete problem that is closest (Euclidean
            distance) to the optimum. The result may be wildly inaccurate depending on how
            representative the discrete points are of the original problem.
    """

    def __init__(self, problem: Problem, kwargs: dict | None = None):
        """Creates a solver that assumes the problem being a fully discrete one.

        Assumes that problem has only data-based objectives and a discrete definition
        that fully defines all the objectives.

        Args:
            problem (Problem): the problem being solved.
            kwargs (Optional[dict]): optional keyword arguments. Not used right now, but kept
                here for compatibility reasons. Defaults to None.

        """
        for obj in problem.objectives:
            if obj.objective_type is not ObjectiveTypeEnum.data_based:
                raise SolverError(f"All objectives must be data-based {obj.symbol}.")
        if problem.discrete_representation is None:
            raise SolverError("Problem must have a discrete representation defined.")
        self.problem = problem
        self.evaluator = PolarsEvaluator(problem, evaluator_mode=PolarsEvaluatorModesEnum.discrete)

    def solve(self, target: str) -> SolverResults:
        """Solve the problem for the given target.

        Args:
            target (str): the symbol of the objective function to be optimized.

        Returns:
            SolverResults: the results fo the optimization.
        """
        results_df = self.evaluator.evaluate()

        # check constraint values if problem has constraints
        if self.problem.constraints is not None:
            cons_condition = pl.lit(True)  # noqa: FBT003
            for constraint in self.problem.constraints:
                cons_condition = cons_condition & (results_df[constraint.symbol] <= 0)

            results_df = results_df.filter(cons_condition)

        # find the row with the minimum value in the 'target' column
        closest = results_df.sort(target).head(1)

        # extract relevant results, extract them as dict for easier jsonification
        variable_values = {variable.symbol: closest[variable.symbol][0] for variable in self.problem.variables}
        objective_values = {objective.symbol: closest[objective.symbol][0] for objective in self.problem.objectives}
        constraint_values = (
            {constraint.symbol: closest[constraint.symbol][0] for constraint in self.problem.constraints}
            if self.problem.constraints is not None
            else None
        )
        extra_func_values = (
            {extra.symbol: closest[extra.symbol][0] for extra in self.problem.extra_funcs}
            if self.problem.extra_funcs is not None
            else None
        )
        scalarization_values = (
            {scal.symbol: closest[scal.symbol][0] for scal in self.problem.scalarization_funcs}
            if self.problem.scalarization_funcs is not None
            else None
        )
        message = f"Optimal value found from tabular data minimizing the column '{target}'."
        success = True

        # wrap results and return them
        return SolverResults(
            optimal_variables=variable_values,
            optimal_objectives=objective_values,
            constraint_values=constraint_values,
            extra_func_values=extra_func_values,
            scalarization_values=scalarization_values,
            success=success,
            message=message,
        )

__init__

__init__(problem: Problem, kwargs: dict | None = None)

Creates a solver that assumes the problem being a fully discrete one.

Assumes that problem has only data-based objectives and a discrete definition that fully defines all the objectives.

Parameters:

Name Type Description Default
problem Problem

the problem being solved.

required
kwargs Optional[dict]

optional keyword arguments. Not used right now, but kept here for compatibility reasons. Defaults to None.

None
Source code in desdeo/tools/proximal_solver.py
def __init__(self, problem: Problem, kwargs: dict | None = None):
    """Creates a solver that assumes the problem being a fully discrete one.

    Assumes that problem has only data-based objectives and a discrete definition
    that fully defines all the objectives.

    Args:
        problem (Problem): the problem being solved.
        kwargs (Optional[dict]): optional keyword arguments. Not used right now, but kept
            here for compatibility reasons. Defaults to None.

    """
    for obj in problem.objectives:
        if obj.objective_type is not ObjectiveTypeEnum.data_based:
            raise SolverError(f"All objectives must be data-based {obj.symbol}.")
    if problem.discrete_representation is None:
        raise SolverError("Problem must have a discrete representation defined.")
    self.problem = problem
    self.evaluator = PolarsEvaluator(problem, evaluator_mode=PolarsEvaluatorModesEnum.discrete)

solve

solve(target: str) -> SolverResults

Solve the problem for the given target.

Parameters:

Name Type Description Default
target str

the symbol of the objective function to be optimized.

required

Returns:

Name Type Description
SolverResults SolverResults

the results fo the optimization.

Source code in desdeo/tools/proximal_solver.py
def solve(self, target: str) -> SolverResults:
    """Solve the problem for the given target.

    Args:
        target (str): the symbol of the objective function to be optimized.

    Returns:
        SolverResults: the results fo the optimization.
    """
    results_df = self.evaluator.evaluate()

    # check constraint values if problem has constraints
    if self.problem.constraints is not None:
        cons_condition = pl.lit(True)  # noqa: FBT003
        for constraint in self.problem.constraints:
            cons_condition = cons_condition & (results_df[constraint.symbol] <= 0)

        results_df = results_df.filter(cons_condition)

    # find the row with the minimum value in the 'target' column
    closest = results_df.sort(target).head(1)

    # extract relevant results, extract them as dict for easier jsonification
    variable_values = {variable.symbol: closest[variable.symbol][0] for variable in self.problem.variables}
    objective_values = {objective.symbol: closest[objective.symbol][0] for objective in self.problem.objectives}
    constraint_values = (
        {constraint.symbol: closest[constraint.symbol][0] for constraint in self.problem.constraints}
        if self.problem.constraints is not None
        else None
    )
    extra_func_values = (
        {extra.symbol: closest[extra.symbol][0] for extra in self.problem.extra_funcs}
        if self.problem.extra_funcs is not None
        else None
    )
    scalarization_values = (
        {scal.symbol: closest[scal.symbol][0] for scal in self.problem.scalarization_funcs}
        if self.problem.scalarization_funcs is not None
        else None
    )
    message = f"Optimal value found from tabular data minimizing the column '{target}'."
    success = True

    # wrap results and return them
    return SolverResults(
        optimal_variables=variable_values,
        optimal_objectives=objective_values,
        constraint_values=constraint_values,
        extra_func_values=extra_func_values,
        scalarization_values=scalarization_values,
        success=success,
        message=message,
    )

Publisher-Subscriber pattern

This module contains the classes for the publisher-subscriber (ish) pattern.

The pattern is used in the evolutionary algorithms to send messages between the different components. This allows the components to be decoupled and the messages to be sent between them without the components knowing about each other. The pattern closely resembles the publisher-subscriber pattern, with one key difference. The subscribers can also create messages and send them to the publisher, which then forwards the messages to the other subscribers.

The pattern is implemented with two classes: Subscriber and Publisher. The Subscriber class is an abstract class that must be inherited by the classes that want to receive (or send) messages. All evolutionary operators must inherit the Subscriber class. Some objects that may be interested in the messages, but otherwise unrelated to the evolutionary operators, may also inherit the Subscriber class. Examples of such objects are a logging class, an archive class, or a class that visualizes intermediate results.

The Publisher class is a class that stores the subscribers and forwards the messages to them. The Publisher class is not connected to the evolutionary algorithms and only serves as a message router. As mentioned earlier, the components do not know about each other, and the Publisher class is the only class that knows about all the connections in between components. The user of the evolutionary algorithms is responsible for creating the connections. However, the implementations of the operators do provide default, so called topics that the operator must subscribe to.

The way the pattern works is as follows. Each operator has a do method which is called by the evolutionary algorithm when the operator is to be executed. This method has some default arguments, depending upon the class of the operator. E.g., the do method of the mutation related classes may have a default arguments as offsprings and parents, where each is a tuple of decision variables, objectives, and constraints. However, some special mutation operator may require additional inputs. E.g., an adaptive mutation operator may require the current generation number as an input. To provide this additional input, we do not change the signature of the do method.

Instead, we let the mutation operator subscribe to a topic called, e.g., current_generation. The publisher, is then responsible for sending the current generation number to the mutation operator, whenever the generation number changes. The mutation operator can then update its internal state based on the received generation number.

To be able to send this information, the Publisher class has a method called notify. Operators can call this method to send messages to the subscribers. The idea is to do this at the end of the do method. That way, whenever any operator is executed, it can send messages to the other operators (which have subscribed to the topics).

Note that the operators do not know about the other operators. The subscribers do not know the origin of the messages. This decoupling allows for a more modular design and easier extensibility of the evolutionary algorithms.

Publisher

Class for a publisher that sends messages to subscribers.

The publisher is unconnected from the evolutionary algorithms and only serves as a message router. The subscribers can subscribe to different message keys and receive messages when the publisher receives a message with the corresponding key.

Source code in desdeo/tools/patterns.py
class Publisher:
    """Class for a publisher that sends messages to subscribers.

    The publisher is unconnected from the evolutionary algorithms and only serves as a message router. The subscribers
    can subscribe to different message keys and receive messages when the publisher receives a message with the
    corresponding key.
    """

    def __init__(self) -> None:
        """Initialize a blank publisher."""
        self.subscribers = {}
        self.global_subscribers = []
        self.registered_topics: dict[MessageTopics, list[str]] = {}

    def subscribe(self, subscriber: Subscriber, topic: MessageTopics) -> None:
        """Store a subscriber for a given message key.

        Whenever the publisher receives a message with the given key, it will notify the subscriber. This method can
        be used to subscribe to multiple topics by calling it multiple times. Moreover, the user can force the
        subscriber to receive all messages by setting the topic to "ALL".

        Args:
            subscriber (Subscriber): the subscriber to notify.
            topic (str): the message topic (key in message dictionary) to subscribe to.
                If "ALL", the subscriber is notified of all messages.
        """
        if topic == "ALL":
            self.global_subscribers.append(subscriber)
            return
        if topic not in self.subscribers:
            self.subscribers[topic] = []
        self.subscribers[topic].append(subscriber)

    def auto_subscribe(self, subscriber: Subscriber) -> None:
        """Store a subscriber for multiple message keys. The subscriber must have the topics attribute.

        Whenever the publisher receives a message with the given key, it will notify the subscriber.

        Args:
            subscriber (Subscriber): the subscriber to notify.
        """
        for topic in subscriber.interested_topics:
            self.subscribe(subscriber, topic)

    def unsubscribe(self, subscriber: Subscriber, topic: str) -> None:
        """Remove a subscriber from a given message key.

        Args:
            subscriber (Subscriber): the subscriber to remove.
            topic (str): the key of the message to unsubscribe from.
        """
        if topic == "ALL":
            self.global_subscribers.remove(subscriber)
            return
        if topic in self.subscribers:
            self.subscribers[topic].remove(subscriber)

    def unsubscribe_multiple(self, subscriber: Subscriber, topics: list[str]) -> None:
        """Remove a subscriber from multiple message keys.

        Args:
            subscriber (Subscriber): the subscriber to remove.
            topics (list[str]): the keys of the messages to unsubscribe from.
        """
        for topic in topics:
            self.unsubscribe(subscriber, topic)

    def force_unsubscribe(self, subscriber: Subscriber) -> None:
        """Remove a subscriber from all message keys.

        Args:
            subscriber (Subscriber): the subscriber to remove.
        """
        for topic in self.subscribers:
            if subscriber in self.subscribers[topic]:
                self.subscribers[topic].remove(subscriber)

    def register_topics(self, topics: list[MessageTopics], source: str) -> None:
        """Register topics provided to the publisher.

        Args:
            topics (list[MessageTopics]): the topics to register.
            source (str): the source of the topics.
        """
        for topic in topics:
            if topic not in self.registered_topics:
                self.registered_topics[topic] = [source]
            else:
                self.registered_topics[topic].append(source)

    def check_consistency(self) -> tuple[bool, dict[MessageTopics, list[str]]]:
        """Check if all subscribed topics have also been registered by a source.

        Returns:
            tuple[bool, dict[MessageTopics, list[str]]]: Returns a tuple. The first element is a bool. True if all
                subscribed topics have been registered by a source. False otherwise. The second element is a dictionary
                of unregistered topics that have been subscribed to.
        """
        unregistered_topics = {}
        for topic in self.subscribers:
            if topic not in self.registered_topics:
                unregistered_topics[topic] = [x.__class__.__name__ for x in self.subscribers[topic]]
        if unregistered_topics:
            return False, unregistered_topics
        return True, {}

    def relationship_map(self):
        """Make a diagram connecting sources to subscribers based on topics."""
        relationships = {}
        for topic in self.subscribers:
            for subscriber in self.subscribers[topic]:
                if topic.value not in relationships:
                    relationships[topic.value] = [(subscriber.__class__.__name__, self.registered_topics[topic])]
                else:
                    relationships[topic.value].append((subscriber.__class__.__name__, self.registered_topics[topic]))
        return relationships

    def notify(self, messages: Sequence[Message] | None) -> None:
        """Notify subscribers of the received message/messages.

        Args:
            messages (Sequence[BaseMessage]): the messages to send to the subscribers. Each message is a pydantic model
                with a topic, value, and a source.
        """
        if messages is None:
            return
        for message in messages:
            # Notify global subscribers
            for subscriber in self.global_subscribers:
                subscriber.update(message)
            # Notify subscribers of the given key
            if message.topic in self.subscribers:
                for subscriber in self.subscribers[message.topic]:
                    subscriber.update(message)

__init__

__init__() -> None

Initialize a blank publisher.

Source code in desdeo/tools/patterns.py
def __init__(self) -> None:
    """Initialize a blank publisher."""
    self.subscribers = {}
    self.global_subscribers = []
    self.registered_topics: dict[MessageTopics, list[str]] = {}

auto_subscribe

auto_subscribe(subscriber: Subscriber) -> None

Store a subscriber for multiple message keys. The subscriber must have the topics attribute.

Whenever the publisher receives a message with the given key, it will notify the subscriber.

Parameters:

Name Type Description Default
subscriber Subscriber

the subscriber to notify.

required
Source code in desdeo/tools/patterns.py
def auto_subscribe(self, subscriber: Subscriber) -> None:
    """Store a subscriber for multiple message keys. The subscriber must have the topics attribute.

    Whenever the publisher receives a message with the given key, it will notify the subscriber.

    Args:
        subscriber (Subscriber): the subscriber to notify.
    """
    for topic in subscriber.interested_topics:
        self.subscribe(subscriber, topic)

check_consistency

check_consistency() -> tuple[
    bool, dict[MessageTopics, list[str]]
]

Check if all subscribed topics have also been registered by a source.

Returns:

Type Description
tuple[bool, dict[MessageTopics, list[str]]]

tuple[bool, dict[MessageTopics, list[str]]]: Returns a tuple. The first element is a bool. True if all subscribed topics have been registered by a source. False otherwise. The second element is a dictionary of unregistered topics that have been subscribed to.

Source code in desdeo/tools/patterns.py
def check_consistency(self) -> tuple[bool, dict[MessageTopics, list[str]]]:
    """Check if all subscribed topics have also been registered by a source.

    Returns:
        tuple[bool, dict[MessageTopics, list[str]]]: Returns a tuple. The first element is a bool. True if all
            subscribed topics have been registered by a source. False otherwise. The second element is a dictionary
            of unregistered topics that have been subscribed to.
    """
    unregistered_topics = {}
    for topic in self.subscribers:
        if topic not in self.registered_topics:
            unregistered_topics[topic] = [x.__class__.__name__ for x in self.subscribers[topic]]
    if unregistered_topics:
        return False, unregistered_topics
    return True, {}

force_unsubscribe

force_unsubscribe(subscriber: Subscriber) -> None

Remove a subscriber from all message keys.

Parameters:

Name Type Description Default
subscriber Subscriber

the subscriber to remove.

required
Source code in desdeo/tools/patterns.py
def force_unsubscribe(self, subscriber: Subscriber) -> None:
    """Remove a subscriber from all message keys.

    Args:
        subscriber (Subscriber): the subscriber to remove.
    """
    for topic in self.subscribers:
        if subscriber in self.subscribers[topic]:
            self.subscribers[topic].remove(subscriber)

notify

notify(messages: Sequence[Message] | None) -> None

Notify subscribers of the received message/messages.

Parameters:

Name Type Description Default
messages Sequence[BaseMessage]

the messages to send to the subscribers. Each message is a pydantic model with a topic, value, and a source.

required
Source code in desdeo/tools/patterns.py
def notify(self, messages: Sequence[Message] | None) -> None:
    """Notify subscribers of the received message/messages.

    Args:
        messages (Sequence[BaseMessage]): the messages to send to the subscribers. Each message is a pydantic model
            with a topic, value, and a source.
    """
    if messages is None:
        return
    for message in messages:
        # Notify global subscribers
        for subscriber in self.global_subscribers:
            subscriber.update(message)
        # Notify subscribers of the given key
        if message.topic in self.subscribers:
            for subscriber in self.subscribers[message.topic]:
                subscriber.update(message)

register_topics

register_topics(
    topics: list[MessageTopics], source: str
) -> None

Register topics provided to the publisher.

Parameters:

Name Type Description Default
topics list[MessageTopics]

the topics to register.

required
source str

the source of the topics.

required
Source code in desdeo/tools/patterns.py
def register_topics(self, topics: list[MessageTopics], source: str) -> None:
    """Register topics provided to the publisher.

    Args:
        topics (list[MessageTopics]): the topics to register.
        source (str): the source of the topics.
    """
    for topic in topics:
        if topic not in self.registered_topics:
            self.registered_topics[topic] = [source]
        else:
            self.registered_topics[topic].append(source)

relationship_map

relationship_map()

Make a diagram connecting sources to subscribers based on topics.

Source code in desdeo/tools/patterns.py
def relationship_map(self):
    """Make a diagram connecting sources to subscribers based on topics."""
    relationships = {}
    for topic in self.subscribers:
        for subscriber in self.subscribers[topic]:
            if topic.value not in relationships:
                relationships[topic.value] = [(subscriber.__class__.__name__, self.registered_topics[topic])]
            else:
                relationships[topic.value].append((subscriber.__class__.__name__, self.registered_topics[topic]))
    return relationships

subscribe

subscribe(
    subscriber: Subscriber, topic: MessageTopics
) -> None

Store a subscriber for a given message key.

Whenever the publisher receives a message with the given key, it will notify the subscriber. This method can be used to subscribe to multiple topics by calling it multiple times. Moreover, the user can force the subscriber to receive all messages by setting the topic to "ALL".

Parameters:

Name Type Description Default
subscriber Subscriber

the subscriber to notify.

required
topic str

the message topic (key in message dictionary) to subscribe to. If "ALL", the subscriber is notified of all messages.

required
Source code in desdeo/tools/patterns.py
def subscribe(self, subscriber: Subscriber, topic: MessageTopics) -> None:
    """Store a subscriber for a given message key.

    Whenever the publisher receives a message with the given key, it will notify the subscriber. This method can
    be used to subscribe to multiple topics by calling it multiple times. Moreover, the user can force the
    subscriber to receive all messages by setting the topic to "ALL".

    Args:
        subscriber (Subscriber): the subscriber to notify.
        topic (str): the message topic (key in message dictionary) to subscribe to.
            If "ALL", the subscriber is notified of all messages.
    """
    if topic == "ALL":
        self.global_subscribers.append(subscriber)
        return
    if topic not in self.subscribers:
        self.subscribers[topic] = []
    self.subscribers[topic].append(subscriber)

unsubscribe

unsubscribe(subscriber: Subscriber, topic: str) -> None

Remove a subscriber from a given message key.

Parameters:

Name Type Description Default
subscriber Subscriber

the subscriber to remove.

required
topic str

the key of the message to unsubscribe from.

required
Source code in desdeo/tools/patterns.py
def unsubscribe(self, subscriber: Subscriber, topic: str) -> None:
    """Remove a subscriber from a given message key.

    Args:
        subscriber (Subscriber): the subscriber to remove.
        topic (str): the key of the message to unsubscribe from.
    """
    if topic == "ALL":
        self.global_subscribers.remove(subscriber)
        return
    if topic in self.subscribers:
        self.subscribers[topic].remove(subscriber)

unsubscribe_multiple

unsubscribe_multiple(
    subscriber: Subscriber, topics: list[str]
) -> None

Remove a subscriber from multiple message keys.

Parameters:

Name Type Description Default
subscriber Subscriber

the subscriber to remove.

required
topics list[str]

the keys of the messages to unsubscribe from.

required
Source code in desdeo/tools/patterns.py
def unsubscribe_multiple(self, subscriber: Subscriber, topics: list[str]) -> None:
    """Remove a subscriber from multiple message keys.

    Args:
        subscriber (Subscriber): the subscriber to remove.
        topics (list[str]): the keys of the messages to unsubscribe from.
    """
    for topic in topics:
        self.unsubscribe(subscriber, topic)

Subscriber

Bases: ABC

Base class for both subscriber and message sender.

These are used in the evolutionary algorithms to send messages between the different components. The pattern closely resembles the publisher-subscriber pattern, with one key difference. The subscribers can also create messages and send them to the publisher, which then forwards the messages to the other subscribers.

Source code in desdeo/tools/patterns.py
class Subscriber(ABC):
    """Base class for both subscriber and message sender.

    These are used in the evolutionary algorithms to send messages between the different components. The pattern
    closely resembles the publisher-subscriber pattern, with one key difference. The subscribers can also create
    messages and send them to the publisher, which then forwards the messages to the other subscribers.
    """

    @property
    @abstractmethod
    def interested_topics(self) -> Sequence[MessageTopics]:
        """Return the topics the subscriber is interested in."""

    @property
    @abstractmethod
    def provided_topics(self) -> dict[int, Sequence[MessageTopics]]:
        """Return the topics the subscriber provides to the publisher, grouped by verbosity level."""

    def __init__(
        self,
        publisher: "Publisher",
        verbosity: int,
    ) -> None:
        """Initialize a subscriber.

        Args:
            publisher (Callable): the publisher to send messages to.
            verbosity (int, optional): the verbosity level of the messages. A value of 0 means no messages at all.
        """
        if not isinstance(verbosity, int):
            raise TypeError("Verbosity must be an integer.")
        if verbosity < 0:
            raise ValueError("Verbosity must be a non-negative integer.")
        self.publisher = publisher
        self.verbosity: int = verbosity

    def notify(self) -> None:
        """Notify the publisher of changes in the subject.

        The contents of the message (a dictionary) are defined in the `state` method. The `state` method can return
        different messages depending on the verbosity level.
        """
        if self.verbosity not in AllowedMessagesAtVerbosity:
            raise ValueError(f"Verbosity level {self.verbosity} is not allowed.")
        if self.verbosity == 0:
            return

        state = self.state()
        if all(isinstance(x, AllowedMessagesAtVerbosity[self.verbosity]) for x in state):
            self.publisher.notify(messages=state)

    @abstractmethod
    def update(self, message: Message) -> None:
        """Update self as a result of messages from the publisher.

        Args:
            message (Message): the message from the publisher. Note that each message is a pydantic model with a topic,
                value, and a source.
        """

    @abstractmethod
    def state(self) -> Sequence[Message]:
        """Return the state of the subject. This is the list of messages to send to the publisher."""

interested_topics abstractmethod property

interested_topics: Sequence[MessageTopics]

Return the topics the subscriber is interested in.

provided_topics abstractmethod property

provided_topics: dict[int, Sequence[MessageTopics]]

Return the topics the subscriber provides to the publisher, grouped by verbosity level.

__init__

__init__(publisher: Publisher, verbosity: int) -> None

Initialize a subscriber.

Parameters:

Name Type Description Default
publisher Callable

the publisher to send messages to.

required
verbosity int

the verbosity level of the messages. A value of 0 means no messages at all.

required
Source code in desdeo/tools/patterns.py
def __init__(
    self,
    publisher: "Publisher",
    verbosity: int,
) -> None:
    """Initialize a subscriber.

    Args:
        publisher (Callable): the publisher to send messages to.
        verbosity (int, optional): the verbosity level of the messages. A value of 0 means no messages at all.
    """
    if not isinstance(verbosity, int):
        raise TypeError("Verbosity must be an integer.")
    if verbosity < 0:
        raise ValueError("Verbosity must be a non-negative integer.")
    self.publisher = publisher
    self.verbosity: int = verbosity

notify

notify() -> None

Notify the publisher of changes in the subject.

The contents of the message (a dictionary) are defined in the state method. The state method can return different messages depending on the verbosity level.

Source code in desdeo/tools/patterns.py
def notify(self) -> None:
    """Notify the publisher of changes in the subject.

    The contents of the message (a dictionary) are defined in the `state` method. The `state` method can return
    different messages depending on the verbosity level.
    """
    if self.verbosity not in AllowedMessagesAtVerbosity:
        raise ValueError(f"Verbosity level {self.verbosity} is not allowed.")
    if self.verbosity == 0:
        return

    state = self.state()
    if all(isinstance(x, AllowedMessagesAtVerbosity[self.verbosity]) for x in state):
        self.publisher.notify(messages=state)

state abstractmethod

state() -> Sequence[Message]

Return the state of the subject. This is the list of messages to send to the publisher.

Source code in desdeo/tools/patterns.py
@abstractmethod
def state(self) -> Sequence[Message]:
    """Return the state of the subject. This is the list of messages to send to the publisher."""

update abstractmethod

update(message: Message) -> None

Update self as a result of messages from the publisher.

Parameters:

Name Type Description Default
message Message

the message from the publisher. Note that each message is a pydantic model with a topic, value, and a source.

required
Source code in desdeo/tools/patterns.py
@abstractmethod
def update(self, message: Message) -> None:
    """Update self as a result of messages from the publisher.

    Args:
        message (Message): the message from the publisher. Note that each message is a pydantic model with a topic,
            value, and a source.
    """

createblanksubs

createblanksubs(
    interested_topics: Sequence[MessageTopics],
) -> type[Subscriber]

Create a blank subscriber for testing purposes.

Parameters:

Name Type Description Default
interested_topics list[MessageTopics]

the topics the subscriber is interested in.

required

Returns:

Type Description
type[Subscriber]

type[Subscriber]: the blank subscriber class.

Source code in desdeo/tools/patterns.py
def createblanksubs(interested_topics: Sequence[MessageTopics]) -> type["Subscriber"]:
    """Create a blank subscriber for testing purposes.

    Args:
        interested_topics (list[MessageTopics]): the topics the subscriber is interested in.

    Returns:
        type[Subscriber]: the blank subscriber class.
    """

    class BlankSubscriber(Subscriber):
        """A simple subscriber for testing purposes."""

        @property
        def interested_topics(self) -> Sequence[MessageTopics]:
            """Return the topics the subscriber is interested in."""
            return interested_topics

        @property
        def provided_topics(self) -> dict[int, Sequence[MessageTopics]]:
            """Return the topics the subscriber provides to the publisher, grouped by verbosity level."""
            return {0: []}

        def __init__(self, publisher: "Publisher", verbosity: int = 0) -> None:
            """Initialize a subscriber."""
            super().__init__(publisher, verbosity)
            self.messages_to_send: list[Message] = []
            self.messages_received: list[Message] = []

        def update(self, message: Message) -> None:
            """Update the internal state of the subscriber."""
            self.messages_received.append(message)

        def state(self) -> list[Message]:
            """Return the internal state of the subscriber."""
            return self.messages_to_send

    return BlankSubscriber

Message Topics

Defines the messaging protocol used by the various EMO operators.

Array2DMessage

Bases: BaseMessage

A message containing a 2D array value, such as a population or a set of objectives.

Source code in desdeo/tools/message.py
class Array2DMessage(BaseMessage):
    """A message containing a 2D array value, such as a population or a set of objectives."""

    value: list[list[float]] = Field(..., description="The array value of the message.")
    """ The array value of the message. """

value class-attribute instance-attribute

value: list[list[float]] = Field(
    ..., description="The array value of the message."
)

The array value of the message.

BaseMessage

Bases: BaseModel

A message containing an integer value.

Source code in desdeo/tools/message.py
class BaseMessage(BaseModel):
    """A message containing an integer value."""

    topic: MessageTopics = Field(..., description="The topic of the message.")
    """ The topic of the message. """
    source: str = Field(..., description="The source of the message.")
    """ The source of the message. """

source class-attribute instance-attribute

source: str = Field(
    ..., description="The source of the message."
)

The source of the message.

topic class-attribute instance-attribute

topic: MessageTopics = Field(
    ..., description="The topic of the message."
)

The topic of the message.

BoolMessage

Bases: BaseMessage

A message containing a boolean value.

Source code in desdeo/tools/message.py
class BoolMessage(BaseMessage):
    """A message containing a boolean value."""

    value: bool = Field(..., description="The boolean value of the message.")
    """ The boolean value of the message. """

value class-attribute instance-attribute

value: bool = Field(
    ..., description="The boolean value of the message."
)

The boolean value of the message.

CrossoverMessageTopics

Bases: Enum

Topics for messages related to crossover operators.

Source code in desdeo/tools/message.py
class CrossoverMessageTopics(Enum):
    """Topics for messages related to crossover operators."""

    TEST = "TEST"
    """ A message topic used only for testing the crossover operators. """
    XOVER_PROBABILITY = "XOVER_PROBABILITY"
    """ The current crossover probability. """
    XOVER_DISTRIBUTION = "XOVER_DISTRIBUTION"
    """ The current crossover distribution index. Primary used in the SBX crossover. """
    PARENTS = "PARENTS"
    """ The parents selected for crossover. """
    OFFSPRINGS = "OFFSPRINGS"
    """ The offsprings generated from the crossover. """
    ALPHA = "ALPHA"
    """ Alpha parameter used in crossover. """
    LAMBDA = "LAMBDA"
    """ Lambda parameter used in crossover. Primarily used in the bounded exponential xover. """

ALPHA class-attribute instance-attribute

ALPHA = 'ALPHA'

Alpha parameter used in crossover.

LAMBDA class-attribute instance-attribute

LAMBDA = 'LAMBDA'

Lambda parameter used in crossover. Primarily used in the bounded exponential xover.

OFFSPRINGS class-attribute instance-attribute

OFFSPRINGS = 'OFFSPRINGS'

The offsprings generated from the crossover.

PARENTS class-attribute instance-attribute

PARENTS = 'PARENTS'

The parents selected for crossover.

TEST class-attribute instance-attribute

TEST = 'TEST'

A message topic used only for testing the crossover operators.

XOVER_DISTRIBUTION class-attribute instance-attribute

XOVER_DISTRIBUTION = 'XOVER_DISTRIBUTION'

The current crossover distribution index. Primary used in the SBX crossover.

XOVER_PROBABILITY class-attribute instance-attribute

XOVER_PROBABILITY = 'XOVER_PROBABILITY'

The current crossover probability.

DictMessage

Bases: BaseMessage

A message containing a dictionary value.

Source code in desdeo/tools/message.py
class DictMessage(BaseMessage):
    """A message containing a dictionary value."""

    value: dict[str, Any] = Field(
        ..., description="The dictionary value of the message."
    )
    """ The dictionary value of the message. """

value class-attribute instance-attribute

value: dict[str, Any] = Field(
    ..., description="The dictionary value of the message."
)

The dictionary value of the message.

EvaluatorMessageTopics

Bases: Enum

Topics for messages related to evaluator operators.

Source code in desdeo/tools/message.py
class EvaluatorMessageTopics(Enum):
    """Topics for messages related to evaluator operators."""

    TEST = "TEST"
    """ A message topic used only for testing the evaluator operators. """
    POPULATION = "POPULATION"
    """ The population to evaluate. """
    OUTPUTS = "OUTPUTS"
    """ The outputs of the population. Contains objectives, targets, constraints. """
    OBJECTIVES = "OBJECTIVES"
    """ The true objective values of the population. """
    TARGETS = "TARGETS"
    """ The targets, i.e., objective values seen by the evolutionary operators."""
    CONSTRAINTS = "CONSTRAINTS"
    """ The constraints of the population. """
    VERBOSE_OUTPUTS = "VERBOSE_OUTPUTS"
    """ Same as POPULATION + OUTPUTS."""
    NEW_EVALUATIONS = "NEW_EVALUATIONS"
    """ The number of new evaluations. """

CONSTRAINTS class-attribute instance-attribute

CONSTRAINTS = 'CONSTRAINTS'

The constraints of the population.

NEW_EVALUATIONS class-attribute instance-attribute

NEW_EVALUATIONS = 'NEW_EVALUATIONS'

The number of new evaluations.

OBJECTIVES class-attribute instance-attribute

OBJECTIVES = 'OBJECTIVES'

The true objective values of the population.

OUTPUTS class-attribute instance-attribute

OUTPUTS = 'OUTPUTS'

The outputs of the population. Contains objectives, targets, constraints.

POPULATION class-attribute instance-attribute

POPULATION = 'POPULATION'

The population to evaluate.

TARGETS class-attribute instance-attribute

TARGETS = 'TARGETS'

The targets, i.e., objective values seen by the evolutionary operators.

TEST class-attribute instance-attribute

TEST = 'TEST'

A message topic used only for testing the evaluator operators.

VERBOSE_OUTPUTS class-attribute instance-attribute

VERBOSE_OUTPUTS = 'VERBOSE_OUTPUTS'

Same as POPULATION + OUTPUTS.

FloatMessage

Bases: BaseMessage

A message containing a float value.

Source code in desdeo/tools/message.py
class FloatMessage(BaseMessage):
    """A message containing a float value."""

    value: float = Field(..., description="The float value of the message.")
    """ The float value of the message. """

value class-attribute instance-attribute

value: float = Field(
    ..., description="The float value of the message."
)

The float value of the message.

GeneratorMessageTopics

Bases: Enum

Topics for messages related to population generator operators.

Source code in desdeo/tools/message.py
class GeneratorMessageTopics(Enum):
    """Topics for messages related to population generator operators."""

    TEST = "TEST"
    """ A message topic used only for testing the evaluator operators. """
    POPULATION = "POPULATION"
    """ The population to evaluate. """
    OUTPUTS = "OUTPUTS"
    """ The outputs of the population generation. Contains objectives, targets, and constraints. """
    OBJECTIVES = "OBJECTIVES"
    """ The true objective values of the population. """
    TARGETS = "TARGETS"
    """ The targets, i.e., objective values seen by the evolutionary operators."""
    CONSTRAINTS = "CONSTRAINTS"
    """ The constraints of the population. """
    VERBOSE_OUTPUTS = "VERBOSE_OUTPUTS"
    """ Same as POPULATION + OUTPUTS. """
    NEW_EVALUATIONS = "NEW_EVALUATIONS"
    """ The number of new evaluations. """

CONSTRAINTS class-attribute instance-attribute

CONSTRAINTS = 'CONSTRAINTS'

The constraints of the population.

NEW_EVALUATIONS class-attribute instance-attribute

NEW_EVALUATIONS = 'NEW_EVALUATIONS'

The number of new evaluations.

OBJECTIVES class-attribute instance-attribute

OBJECTIVES = 'OBJECTIVES'

The true objective values of the population.

OUTPUTS class-attribute instance-attribute

OUTPUTS = 'OUTPUTS'

The outputs of the population generation. Contains objectives, targets, and constraints.

POPULATION class-attribute instance-attribute

POPULATION = 'POPULATION'

The population to evaluate.

TARGETS class-attribute instance-attribute

TARGETS = 'TARGETS'

The targets, i.e., objective values seen by the evolutionary operators.

TEST class-attribute instance-attribute

TEST = 'TEST'

A message topic used only for testing the evaluator operators.

VERBOSE_OUTPUTS class-attribute instance-attribute

VERBOSE_OUTPUTS = 'VERBOSE_OUTPUTS'

Same as POPULATION + OUTPUTS.

GenericMessage

Bases: BaseMessage

A message containing a generic value.

Source code in desdeo/tools/message.py
class GenericMessage(BaseMessage):
    """A message containing a generic value."""

    value: Any = Field(..., description="The generic value of the message.")
    """ The generic value of the message. """

value class-attribute instance-attribute

value: Any = Field(
    ..., description="The generic value of the message."
)

The generic value of the message.

IntMessage

Bases: BaseMessage

A message containing an integer value.

Source code in desdeo/tools/message.py
class IntMessage(BaseMessage):
    """A message containing an integer value."""

    value: int = Field(..., description="The integer value of the message.")
    """ The integer value of the message. """

value class-attribute instance-attribute

value: int = Field(
    ..., description="The integer value of the message."
)

The integer value of the message.

MutationMessageTopics

Bases: Enum

Topics for messages related to mutation operators.

Source code in desdeo/tools/message.py
class MutationMessageTopics(Enum):
    """Topics for messages related to mutation operators."""

    TEST = "TEST"
    """ A message topic used only for testing the mutation operators. """
    MUTATION_PROBABILITY = "MUTATION_PROBABILITY"
    """ The current mutation probability. """
    MUTATION_DISTRIBUTION = "MUTATION_DISTRIBUTION"
    """ The current mutation distribution index. Primary used in the polynomial mutation. """
    OFFSPRING_ORIGINAL = "OFFSPRING_ORIGINAL"
    """ The original offsprings before mutation. """
    OFFSPRINGS = "OFFSPRINGS"
    """ The offsprings after mutation. """
    PARENTS = "PARENTS"
    """ The parents of the offsprings. """

MUTATION_DISTRIBUTION class-attribute instance-attribute

MUTATION_DISTRIBUTION = 'MUTATION_DISTRIBUTION'

The current mutation distribution index. Primary used in the polynomial mutation.

MUTATION_PROBABILITY class-attribute instance-attribute

MUTATION_PROBABILITY = 'MUTATION_PROBABILITY'

The current mutation probability.

OFFSPRINGS class-attribute instance-attribute

OFFSPRINGS = 'OFFSPRINGS'

The offsprings after mutation.

OFFSPRING_ORIGINAL class-attribute instance-attribute

OFFSPRING_ORIGINAL = 'OFFSPRING_ORIGINAL'

The original offsprings before mutation.

PARENTS class-attribute instance-attribute

PARENTS = 'PARENTS'

The parents of the offsprings.

TEST class-attribute instance-attribute

TEST = 'TEST'

A message topic used only for testing the mutation operators.

NumpyArrayMessage

Bases: BaseMessage

A message containing a numpy array value.

Source code in desdeo/tools/message.py
class NumpyArrayMessage(BaseMessage):
    """A message containing a numpy array value."""

    value: np.ndarray = Field(..., description="The numpy array value of the message.")
    """ The numpy array value of the message. """

    model_config = ConfigDict(arbitrary_types_allowed=True)

    @field_serializer("value")
    def _serialize_value(self, value: np.ndarray) -> list[list[float]]:
        return value.tolist()

value class-attribute instance-attribute

value: ndarray = Field(
    ..., description="The numpy array value of the message."
)

The numpy array value of the message.

PolarsDataFrameMessage

Bases: BaseMessage

A message containing a 2D array value, such as a population or a set of objectives.

Source code in desdeo/tools/message.py
class PolarsDataFrameMessage(BaseMessage):
    """A message containing a 2D array value, such as a population or a set of objectives."""

    value: DataFrame = Field(..., description="The array value of the message.")
    """ The array value of the message. """

    model_config = ConfigDict(arbitrary_types_allowed=True)

    @field_serializer("value")
    def _serialize_value(self, value: DataFrame) -> dict[str, list[int | float]]:
        return value.to_dict(as_series=False)

value class-attribute instance-attribute

value: DataFrame = Field(
    ..., description="The array value of the message."
)

The array value of the message.

ReferenceVectorMessageTopics

Bases: Enum

Topics for messages related to the reference vectors.

Source code in desdeo/tools/message.py
class ReferenceVectorMessageTopics(Enum):
    """Topics for messages related to the reference vectors."""

    TEST = "TEST"

SelectorMessageTopics

Bases: Enum

Topics for messages related to selector operators.

Source code in desdeo/tools/message.py
class SelectorMessageTopics(Enum):
    """Topics for messages related to selector operators."""

    TEST = "TEST"
    """ A message topic used only for testing the selector operators. """
    STATE = "STATE"
    """ The state of the parameters of the selector. """
    INDIVIDUALS = "INDIVIDUALS"
    """ The individuals to select from. """
    OUTPUTS = "OUTPUTS"
    """ The outputs of the individuals. """
    CONSTRAINTS = "CONSTRAINTS"
    """ The constraints of the individuals. """
    SELECTED_INDIVIDUALS = "SELECTED_INDIVIDUALS"
    """ The individuals selected by the selector. """
    SELECTED_OUTPUTS = "SELECTED_OUTPUTS"
    """ The targets of the selected individuals. """
    SELECTED_FITNESS = "SELECTED_FITNESS"
    """ The fitness of the selected individuals. This is the fitness calculated by the selector, not the objectives."""
    SELECTED_VERBOSE_OUTPUTS = "SELECTED_VERBOSE_OUTPUTS"
    """ Same as SELECTED_OUTPUTS + SELECTED_INDIVIDUALS"""
    REFERENCE_VECTORS = "REFERENCE_VECTORS"
    """ The reference vectors used in the selection in decomposition-based EMO algorithms. """

CONSTRAINTS class-attribute instance-attribute

CONSTRAINTS = 'CONSTRAINTS'

The constraints of the individuals.

INDIVIDUALS class-attribute instance-attribute

INDIVIDUALS = 'INDIVIDUALS'

The individuals to select from.

OUTPUTS class-attribute instance-attribute

OUTPUTS = 'OUTPUTS'

The outputs of the individuals.

REFERENCE_VECTORS class-attribute instance-attribute

REFERENCE_VECTORS = 'REFERENCE_VECTORS'

The reference vectors used in the selection in decomposition-based EMO algorithms.

SELECTED_FITNESS class-attribute instance-attribute

SELECTED_FITNESS = 'SELECTED_FITNESS'

The fitness of the selected individuals. This is the fitness calculated by the selector, not the objectives.

SELECTED_INDIVIDUALS class-attribute instance-attribute

SELECTED_INDIVIDUALS = 'SELECTED_INDIVIDUALS'

The individuals selected by the selector.

SELECTED_OUTPUTS class-attribute instance-attribute

SELECTED_OUTPUTS = 'SELECTED_OUTPUTS'

The targets of the selected individuals.

SELECTED_VERBOSE_OUTPUTS class-attribute instance-attribute

SELECTED_VERBOSE_OUTPUTS = 'SELECTED_VERBOSE_OUTPUTS'

Same as SELECTED_OUTPUTS + SELECTED_INDIVIDUALS

STATE class-attribute instance-attribute

STATE = 'STATE'

The state of the parameters of the selector.

TEST class-attribute instance-attribute

TEST = 'TEST'

A message topic used only for testing the selector operators.

StringMessage

Bases: BaseMessage

A message containing a string value.

Source code in desdeo/tools/message.py
class StringMessage(BaseMessage):
    """A message containing a string value."""

    value: str = Field(..., description="The string value of the message.")
    """ The string value of the message. """

value class-attribute instance-attribute

value: str = Field(
    ..., description="The string value of the message."
)

The string value of the message.

TerminatorMessageTopics

Bases: Enum

Topics for messages related to terminator operators.

Source code in desdeo/tools/message.py
class TerminatorMessageTopics(Enum):
    """Topics for messages related to terminator operators."""

    TEST = "TEST"
    """ A message topic used only for testing the terminator operators. """
    STATE = "STATE"
    """ The state of the parameters of the terminator. """
    TERMINATION = "TERMINATION"
    """ The value of the termination condition. """
    GENERATION = "GENERATION"
    """ The current generation number. """
    EVALUATION = "EVALUATION"
    """ The current number of evaluations. """
    MAX_GENERATIONS = "MAX_GENERATIONS"
    """ The maximum number of generations. """
    MAX_EVALUATIONS = "MAX_EVALUATIONS"
    """ The maximum number of evaluations. """

EVALUATION class-attribute instance-attribute

EVALUATION = 'EVALUATION'

The current number of evaluations.

GENERATION class-attribute instance-attribute

GENERATION = 'GENERATION'

The current generation number.

MAX_EVALUATIONS class-attribute instance-attribute

MAX_EVALUATIONS = 'MAX_EVALUATIONS'

The maximum number of evaluations.

MAX_GENERATIONS class-attribute instance-attribute

MAX_GENERATIONS = 'MAX_GENERATIONS'

The maximum number of generations.

STATE class-attribute instance-attribute

STATE = 'STATE'

The state of the parameters of the terminator.

TERMINATION class-attribute instance-attribute

TERMINATION = 'TERMINATION'

The value of the termination condition.

TEST class-attribute instance-attribute

TEST = 'TEST'

A message topic used only for testing the terminator operators.

Utilities

General utilities related to solvers.

find_compatible_solvers

find_compatible_solvers(
    problem: Problem,
) -> list[BaseSolver]

Find solvers that are compatible with the problem that is being solved.

Parameters:

Name Type Description Default
problem Problem

The problem being solved.

required

Returns:

Type Description
list[BaseSolver]

list[BaseSolver]: A list of solvers that are compatible with the problem.

Source code in desdeo/tools/utils.py
def find_compatible_solvers(problem: Problem) -> list[BaseSolver]:
    """Find solvers that are compatible with the problem that is being solved.

    Args:
        problem (Problem): The problem being solved.

    Returns:
        list[BaseSolver]: A list of solvers that are compatible with the problem.
    """
    solvers = []

    # check for variable dimensions
    # This could be also done by just checking if all the variables are Variables instead of TensorVariables
    # as solvers at the moment do not care about the difference between 1D tensors and higher dimensions.
    # This is because the solvers that utilize the polars evaluator (the only evaluator that works with
    # scalars and 1D tensors and not higher dimensions) only support scalar valued variables at the moment.
    var_dim = variable_dimension_enumerate(problem)

    # check if problem has only data-based objectives
    all_data_based = all(objective.objective_type == ObjectiveTypeEnum.data_based for objective in problem.objectives)

    # check if problem has a discrete definition
    has_discrete = problem.discrete_representation is not None

    # check if problem is data-based
    if all_data_based and has_discrete and var_dim == VariableDimensionEnum.scalar:
        # problem has only data-based objectives and a discrete definition is available
        # return ProximalSolver as it is the only solver for data-based problems at the moment
        return [available_solvers["proximal"]["constructor"]]

    # check if the problem is differentiable and if it is mixed integer
    if (
        problem.is_twice_differentiable
        and shutil.which("bonmin")
        and problem.variable_domain
        in [
            VariableDomainTypeEnum.integer,
            VariableDomainTypeEnum.mixed,
        ]
    ):
        solvers.append(available_solvers["pyomo_bonmin"]["constructor"])  # bonmin has to be installed

    # check if the problem is differentiable and continuous
    if (
        problem.is_twice_differentiable
        and shutil.which("ipopt")
        and problem.variable_domain in [VariableDomainTypeEnum.continuous]
    ):
        solvers.append(available_solvers["pyomo_ipopt"]["constructor"])  # ipopt has to be installed

    # check if the problem is convex or log-log convex
    if check_cvxpy_suitability(problem):
        solvers.append(available_solvers["cvxpy"]["constructor"])

    # check if the problem is linear
    if problem.is_linear and check_gurobi_license():
        solvers.append(available_solvers["gurobipy"]["constructor"])
    if problem.is_linear and shutil.which("gurobi"):
        solvers.append(available_solvers["pyomo_gurobi"]["constructor"])  # gurobi has to be installed
    if problem.is_linear and shutil.which("cbc"):
        solvers.append(available_solvers["pyomo_cbc"]["constructor"])

    # check if problem's variables are all scalars
    if var_dim == VariableDimensionEnum.scalar:
        # nevergrad and scipy solvers work with all(?) problems with only scalar valued variables
        solvers.append(available_solvers["nevergrad"]["constructor"])
        solvers.append(available_solvers["scipy_minimize"]["constructor"])
        solvers.append(available_solvers["scipy_de"]["constructor"])
    return solvers

flip_maximized_objective_values

flip_maximized_objective_values(
    problem: Problem, objective_values: dict[str, float]
) -> dict[str, float]

Flips the objective values if the objective function is to be maximized.

Flips the objective values if the objective function is to be maximized by multiplying the values related to maximized objective functions by -1.

Parameters:

Name Type Description Default
problem Problem

the problem the objective values are related to.

required
objective_values dict[str, float]

the objective values to be flipped.

required

Returns:

Type Description
dict[str, float]

dict[str, float]: the flipped objective values.

Source code in desdeo/tools/utils.py
def flip_maximized_objective_values(problem: Problem, objective_values: dict[str, float]) -> dict[str, float]:
    """Flips the objective values if the objective function is to be maximized.

    Flips the objective values if the objective function is to be maximized by multiplying
    the values related to maximized objective functions by -1.

    Args:
        problem (Problem): the problem the objective values are related to.
        objective_values (dict[str, float]): the objective values to be flipped.

    Returns:
        dict[str, float]: the flipped objective values.
    """
    return {
        obj.symbol: objective_values[obj.symbol] * -1 if obj.maximize else objective_values[obj.symbol]
        for obj in problem.objectives
    }

get_corrected_ideal

get_corrected_ideal(
    problem: Problem,
) -> dict[str, float | None]

Compute the corrected ideal point depending if an objective function is to be maximized or not.

I.e., the ideal point element for objectives to be maximized will be multiplied by -1.

Parameters:

Name Type Description Default
problem Problem

the problem with the ideal point.

required

Raises:

Type Description
ValueError

some of the ideal point components have not been defined for some of the objectives.

Returns:

Type Description
dict[str, float | None]

list[float]: a list with the corrected ideal point. Will return None for missing elements.

Source code in desdeo/tools/utils.py
def get_corrected_ideal(problem: Problem) -> dict[str, float | None]:
    """Compute the corrected ideal point depending if an objective function is to be maximized or not.

    I.e., the ideal point element for objectives to be maximized will be multiplied by -1.

    Args:
        problem (Problem): the problem with the ideal point.

    Raises:
        ValueError: some of the ideal point components have not been defined
            for some of the objectives.

    Returns:
        list[float]: a list with the corrected ideal point. Will return None for missing elements.
    """
    # check that ideal points are actually defined
    if any(obj.ideal is None for obj in problem.objectives):
        msg = "Some of the objectives have not a defined ideal value."
        raise ValueError(msg)

    return {
        objective.symbol: objective.ideal if not objective.maximize else -objective.ideal
        for objective in problem.objectives
    }

get_corrected_ideal_and_nadir

get_corrected_ideal_and_nadir(
    problem: Problem,
) -> tuple[
    dict[str, float | None], dict[str, float | None] | None
]

Compute the corrected ideal and nadir points depending if an objective function is to be maximized or not.

I.e., the ideal and nadir point element for objectives to be maximized will be multiplied by -1.

Parameters:

Name Type Description Default
problem Problem

the problem with the ideal and nadir points.

required

Raises:

Type Description
ValueError

some of the ideal or nadir point components have not been defined for some of the objectives.

Returns:

Type Description
tuple[dict[str, float | None], dict[str, float | None] | None]

tuple[list[float], list[float]]: a list with the corrected ideal point and a list with the corrected nadir point. Will return None for missing elements.

Source code in desdeo/tools/utils.py
def get_corrected_ideal_and_nadir(problem: Problem) -> tuple[dict[str, float | None], dict[str, float | None] | None]:
    """Compute the corrected ideal and nadir points depending if an objective function is to be maximized or not.

    I.e., the ideal and nadir point element for objectives to be maximized will be multiplied by -1.

    Args:
        problem (Problem): the problem with the ideal and nadir points.

    Raises:
        ValueError: some of the ideal or nadir point components have not been defined
            for some of the objectives.

    Returns:
        tuple[list[float], list[float]]: a list with the corrected ideal point
            and a list with the corrected nadir point. Will return None for missing
            elements.
    """
    # check that ideal and nadir points are actually defined
    if any(obj.ideal is None for obj in problem.objectives) or any(obj.nadir is None for obj in problem.objectives):
        msg = "Some of the objectives have not a defined ideal or nadir value."
        raise ValueError(msg)

    ideal_point = {
        objective.symbol: objective.ideal if not objective.maximize else -objective.ideal
        for objective in problem.objectives
    }
    nadir_point = {
        objective.symbol: objective.nadir if not objective.maximize else -objective.nadir
        for objective in problem.objectives
    }

    return ideal_point, nadir_point

get_corrected_nadir

get_corrected_nadir(
    problem: Problem,
) -> dict[str, float | None]

Compute the corrected nadir point depending if an objective function is to be maximized or not.

I.e., the nadir point element for objectives to be maximized will be multiplied by -1.

Parameters:

Name Type Description Default
problem Problem

the problem with the nadir points.

required

Raises:

Type Description
ValueError

some of the nadir point components have not been defined for some of the objectives.

Returns:

Type Description
dict[str, float | None]

list[float]: a list with the corrected nadir point. Will return None for missing elements.

Source code in desdeo/tools/utils.py
def get_corrected_nadir(problem: Problem) -> dict[str, float | None]:
    """Compute the corrected nadir point depending if an objective function is to be maximized or not.

    I.e., the nadir point element for objectives to be maximized will be multiplied by -1.

    Args:
        problem (Problem): the problem with the nadir points.

    Raises:
        ValueError: some of the nadir point components have not been defined
            for some of the objectives.

    Returns:
        list[float]: a list with the corrected nadir point. Will return None for missing elements.
    """
    # check that nadir points are actually defined
    if any(obj.nadir is None for obj in problem.objectives):
        msg = "Some of the objectives have not a defined nadir value."
        raise ValueError(msg)

    return {
        objective.symbol: objective.nadir if not objective.maximize else -objective.nadir
        for objective in problem.objectives
    }

guess_best_solver

guess_best_solver(problem: Problem) -> BaseSolver

Given a problem, tries to guess the best solver to handle it.

Parameters:

Name Type Description Default
problem Problem

the problem being solved.

required
Note

Needs to be extended as new solvers are implemented.

Returns:

Name Type Description
BaseSolver BaseSolver

a solver class that uses BaseSolver as a base class

Source code in desdeo/tools/utils.py
def guess_best_solver(problem: Problem) -> BaseSolver:
    """Given a problem, tries to guess the best solver to handle it.

    Args:
        problem (Problem): the problem being solved.

    Note:
        Needs to be extended as new solvers are implemented.

    Returns:
        BaseSolver: a solver class that uses BaseSolver as a base class
    """
    # needs to be extended as new solver are implemented

    # check if problem has only data-based objectives
    all_data_based = all(objective.objective_type == ObjectiveTypeEnum.data_based for objective in problem.objectives)

    # check if problem has a discrete definition
    has_discrete = problem.discrete_representation is not None

    # TODO: when figured out what solver is best for problems with tensor variables: it seems that e.g. the
    # forest problems don't work with pyomo_cbc. So VERY MUCH a quick fix to get something working, as those types of
    # problems have been ok with gurobipy.

    # if True in [isinstance(variable, TensorVariable) for variable in problem.variables]:
    if False:
        if problem.is_linear and shutil.which("cbc"):
            return available_solvers["pyomo_cbc"]["constructor"]

        if problem.is_linear:
            return available_solvers["gurobipy"]["constructor"]

        # check if the problem is differentiable and if it is mixed integer
        if (
            problem.is_twice_differentiable
            and shutil.which("bonmin")
            and problem.variable_domain
            in [
                VariableDomainTypeEnum.integer,
                VariableDomainTypeEnum.mixed,
            ]
        ):
            return available_solvers["pyomo_bonmin"]["constructor"]

        # check if the problem is differentiable and continuous
        if (
            problem.is_twice_differentiable
            and shutil.which("ipopt")
            and problem.variable_domain in [VariableDomainTypeEnum.continuous]
        ):
            return available_solvers["pyomo_ipopt"]["constructor"]

    if all_data_based and has_discrete:
        # problem has only data-based objectives and a discrete definition is available
        # guess proximal solver is best
        return available_solvers["proximal"]["constructor"]

    # check if the problem is linear
    if problem.is_linear and check_gurobi_license():
        return available_solvers["gurobipy"]["constructor"]

    # check if the problem is convex or log-log convex
    if check_cvxpy_suitability(problem):
        return available_solvers["cvxpy"]["constructor"]

    # check if the problem is differentiable and if it is mixed integer
    if (
        problem.is_twice_differentiable
        and shutil.which("bonmin")
        and problem.variable_domain
        in [
            VariableDomainTypeEnum.integer,
            VariableDomainTypeEnum.mixed,
        ]
    ):
        return available_solvers["pyomo_bonmin"]["constructor"]

    # check if the problem is differentiable and continuous
    if (
        problem.is_twice_differentiable
        and shutil.which("ipopt")
        and problem.variable_domain in [VariableDomainTypeEnum.continuous]
    ):
        return available_solvers["pyomo_ipopt"]["constructor"]

    # else, guess nevergrad heuristics to be the best
    return available_solvers["nevergrad"]["constructor"]

payoff_table_method

payoff_table_method(
    problem: Problem, solver: BaseSolver = None
) -> tuple[dict[str, float], dict[str, float]]

Solves a representation for the ideal and nadir points for a multiobjective optimization problem.

Parameters:

Name Type Description Default
problem Problem

The problem for which the ideal and nadir are solved.

required
solver BaseSolver

The solver to be used in solving the points. Defaults to None.

None

Returns:

Type Description
tuple[dict[str, float], dict[str, float]]

tuple[dict[str, float], dict[str, float]]: The estimated ideal and nadir points.

Source code in desdeo/tools/utils.py
def payoff_table_method(problem: Problem, solver: BaseSolver = None) -> tuple[dict[str, float], dict[str, float]]:
    """Solves a representation for the ideal and nadir points for a multiobjective optimization problem.

    Args:
        problem (Problem): The problem for which the ideal and nadir are solved.
        solver (BaseSolver): The solver to be used in solving the points. Defaults to None.

    Returns:
        tuple[dict[str, float], dict[str, float]]: The estimated ideal and nadir points.
    """
    solver = solver if solver is not None else guess_best_solver(problem)
    solver = solver(problem)

    k = len(problem.objectives)
    po_table = np.zeros((k, k))

    for i in range(k):
        res = solver.solve(f"{problem.objectives[i].symbol}_min")
        for j in range(k):
            po_table[i][j] = res.optimal_objectives[problem.objectives[j].symbol]

    ideal = np.diag(po_table)
    nadir = []

    for i in range(k):
        if problem.objectives[i].maximize:
            nadir.append(np.min(po_table.T[i]))
        else:
            nadir.append(np.max(po_table.T[i]))
    return numpy_array_to_objective_dict(problem, ideal), numpy_array_to_objective_dict(problem, nadir)

repair

repair(
    lower_bounds: dict[str, float],
    upper_bounds: dict[str, float],
) -> Callable[[pl.DataFrame], pl.DataFrame]

Repairs the offspring by clipping the values to be within the specified bounds.

Useful in evolutionary algorithms where offspring may go out of bounds due to crossover or mutation operations.

Parameters:

Name Type Description Default
lower_bounds dict[str, float]

The lower bounds for each variable.

required
upper_bounds dict[str, float]

The upper bounds for each variable.

required

Returns:

Type Description
Callable[[DataFrame], DataFrame]

Callable[[pl.DataFrame], pl.DataFrame]: A function that takes a DataFrame and returns a repaired DataFrame.

Source code in desdeo/tools/utils.py
def repair(lower_bounds: dict[str, float], upper_bounds: dict[str, float]) -> Callable[[pl.DataFrame], pl.DataFrame]:
    """Repairs the offspring by clipping the values to be within the specified bounds.

    Useful in evolutionary algorithms where offspring may go out of bounds due to crossover or mutation operations.

    Args:
        lower_bounds (dict[str, float]): The lower bounds for each variable.
        upper_bounds (dict[str, float]): The upper bounds for each variable.

    Returns:
        Callable[[pl.DataFrame], pl.DataFrame]: A function that takes a DataFrame and returns a repaired DataFrame.
    """

    def actual_repair(offspring: pl.DataFrame) -> pl.DataFrame:
        for var in offspring.columns:
            offspring = offspring.with_columns(
                pl.col(var).clip(lower_bound=lower_bounds[var], upper_bound=upper_bounds[var])
            )
        return offspring

    return actual_repair

Generics

Defines generic classes, functions, and objects utilized in the tools module.

BaseSolver

Bases: ABC

Defines a schema for a solver base class.

Source code in desdeo/tools/generics.py
class BaseSolver(ABC):
    """Defines a schema for a solver base class."""

    evaluator: object
    problem: Problem

    def __init__(self, problem: Problem, options: dict[str, any] | None = None):
        """Initializer for the persistent solver.

        Args:
            problem (Problem): The problem for the solver.
            options (dict[str,any]): Dictionary of parameters to set.
                What these should be depends on the solver used.
        """
        self.problem = problem

    @abstractmethod
    def solve(self, target: str) -> SolverResults:
        """Solves the current problem with the specified target.

        Args:
            target (str): a str representing the symbol of the target function.

        Returns:
            SolverResults: The results of the solver
        """

__init__

__init__(
    problem: Problem, options: dict[str, any] | None = None
)

Initializer for the persistent solver.

Parameters:

Name Type Description Default
problem Problem

The problem for the solver.

required
options dict[str, any]

Dictionary of parameters to set. What these should be depends on the solver used.

None
Source code in desdeo/tools/generics.py
def __init__(self, problem: Problem, options: dict[str, any] | None = None):
    """Initializer for the persistent solver.

    Args:
        problem (Problem): The problem for the solver.
        options (dict[str,any]): Dictionary of parameters to set.
            What these should be depends on the solver used.
    """
    self.problem = problem

solve abstractmethod

solve(target: str) -> SolverResults

Solves the current problem with the specified target.

Parameters:

Name Type Description Default
target str

a str representing the symbol of the target function.

required

Returns:

Name Type Description
SolverResults SolverResults

The results of the solver

Source code in desdeo/tools/generics.py
@abstractmethod
def solve(self, target: str) -> SolverResults:
    """Solves the current problem with the specified target.

    Args:
        target (str): a str representing the symbol of the target function.

    Returns:
        SolverResults: The results of the solver
    """

EMOResult

Bases: BaseModel

Defines a schema for a dataclass to store the results of an EMO method.

Source code in desdeo/tools/generics.py
class EMOResult(BaseModel):
    """Defines a schema for a dataclass to store the results of an EMO method."""

    model_config = ConfigDict(arbitrary_types_allowed=True, use_attribute_docstrings=True)

    optimal_variables: pl.DataFrame = Field()
    """The decision vectors of the final population."""
    optimal_outputs: pl.DataFrame = Field()
    """The objective vectors, constraint vectors, extra_funcs, and targets of the final population."""

    @field_serializer("optimal_variables")
    def _serialize_optimal_variables(self, value: pl.DataFrame) -> dict[str, list[int | float]]:
        return value.to_dict(as_series=False)

    @field_serializer("optimal_outputs")
    def _serialize_optimal_outputs(self, value: pl.DataFrame) -> dict[str, list[int | float]]:
        return value.to_dict(as_series=False)

optimal_outputs class-attribute instance-attribute

optimal_outputs: DataFrame = Field()

The objective vectors, constraint vectors, extra_funcs, and targets of the final population.

optimal_variables class-attribute instance-attribute

optimal_variables: DataFrame = Field()

The decision vectors of the final population.

PersistentSolver

Defines a schema for a persistent solver class.

Can be used when reinitializing the solver every time the problem is changed is not practical.

Source code in desdeo/tools/generics.py
class PersistentSolver:
    """Defines a schema for a persistent solver class.

    Can be used when reinitializing the solver every time the problem is changed is not practical.
    """

    evaluator: object
    problem: Problem

    def __init__(self, problem: Problem, options: dict[str, any] | None = None):
        """Initializer for the persistent solver.

        Args:
            problem (Problem): The problem for the solver.
            options (dict[str,any]): Dictionary of parameters to set.
                What these should be depends on the solver used.
        """
        self.problem = problem

    def add_constraint(self, constraint: Constraint | list[Constraint]):
        """Add a constraint expression to the solver.

        Args:
            constraint (Constraint): the constraint function expression.
        """

    def add_objective(self, objective: Objective):
        """Adds an objective function expression to the solver.

        Args:
            objective (Objective): an objective function expression to be added.
        """

    def add_scalarization_function(self, scalarization: ScalarizationFunction):
        """Adds a scalrization expression to the solver.

        Args:
            scalarization (ScalarizationFunction): A scalarization function to be added.
        """

    def add_variable(self, variable: Variable):
        """Add a variable to the solver.

        Args:
            variable (Variable): The definition of the variable to be added.
        """

    def remove_constraint(self, symbol: str):
        """Removes a constraint from the solver.

        Args:
            symbol (str): a str representing the symbol of the constraint to be removed.
        """

    def remove_variable(self, symbol: str):
        """Removes a variable from the model.

        Args:
            symbol (str): a str representing the symbol of the variable to be removed.
        """

    def solve(self, target: str) -> SolverResults:
        """Solves the current problem with the specified target.

        Args:
            target (str): a str representing the symbol of the target function.

        Returns:
            SolverResults: The results of the solver
        """

__init__

__init__(
    problem: Problem, options: dict[str, any] | None = None
)

Initializer for the persistent solver.

Parameters:

Name Type Description Default
problem Problem

The problem for the solver.

required
options dict[str, any]

Dictionary of parameters to set. What these should be depends on the solver used.

None
Source code in desdeo/tools/generics.py
def __init__(self, problem: Problem, options: dict[str, any] | None = None):
    """Initializer for the persistent solver.

    Args:
        problem (Problem): The problem for the solver.
        options (dict[str,any]): Dictionary of parameters to set.
            What these should be depends on the solver used.
    """
    self.problem = problem

add_constraint

add_constraint(constraint: Constraint | list[Constraint])

Add a constraint expression to the solver.

Parameters:

Name Type Description Default
constraint Constraint

the constraint function expression.

required
Source code in desdeo/tools/generics.py
def add_constraint(self, constraint: Constraint | list[Constraint]):
    """Add a constraint expression to the solver.

    Args:
        constraint (Constraint): the constraint function expression.
    """

add_objective

add_objective(objective: Objective)

Adds an objective function expression to the solver.

Parameters:

Name Type Description Default
objective Objective

an objective function expression to be added.

required
Source code in desdeo/tools/generics.py
def add_objective(self, objective: Objective):
    """Adds an objective function expression to the solver.

    Args:
        objective (Objective): an objective function expression to be added.
    """

add_scalarization_function

add_scalarization_function(
    scalarization: ScalarizationFunction,
)

Adds a scalrization expression to the solver.

Parameters:

Name Type Description Default
scalarization ScalarizationFunction

A scalarization function to be added.

required
Source code in desdeo/tools/generics.py
def add_scalarization_function(self, scalarization: ScalarizationFunction):
    """Adds a scalrization expression to the solver.

    Args:
        scalarization (ScalarizationFunction): A scalarization function to be added.
    """

add_variable

add_variable(variable: Variable)

Add a variable to the solver.

Parameters:

Name Type Description Default
variable Variable

The definition of the variable to be added.

required
Source code in desdeo/tools/generics.py
def add_variable(self, variable: Variable):
    """Add a variable to the solver.

    Args:
        variable (Variable): The definition of the variable to be added.
    """

remove_constraint

remove_constraint(symbol: str)

Removes a constraint from the solver.

Parameters:

Name Type Description Default
symbol str

a str representing the symbol of the constraint to be removed.

required
Source code in desdeo/tools/generics.py
def remove_constraint(self, symbol: str):
    """Removes a constraint from the solver.

    Args:
        symbol (str): a str representing the symbol of the constraint to be removed.
    """

remove_variable

remove_variable(symbol: str)

Removes a variable from the model.

Parameters:

Name Type Description Default
symbol str

a str representing the symbol of the variable to be removed.

required
Source code in desdeo/tools/generics.py
def remove_variable(self, symbol: str):
    """Removes a variable from the model.

    Args:
        symbol (str): a str representing the symbol of the variable to be removed.
    """

solve

solve(target: str) -> SolverResults

Solves the current problem with the specified target.

Parameters:

Name Type Description Default
target str

a str representing the symbol of the target function.

required

Returns:

Name Type Description
SolverResults SolverResults

The results of the solver

Source code in desdeo/tools/generics.py
def solve(self, target: str) -> SolverResults:
    """Solves the current problem with the specified target.

    Args:
        target (str): a str representing the symbol of the target function.

    Returns:
        SolverResults: The results of the solver
    """

SolverError

Bases: Exception

Raised when an error with a solver is encountered.

Source code in desdeo/tools/generics.py
class SolverError(Exception):
    """Raised when an error with a solver is encountered."""

SolverResults

Bases: BaseModel

Defines a schema for a dataclass to store the results of a solver.

Source code in desdeo/tools/generics.py
class SolverResults(BaseModel):
    """Defines a schema for a dataclass to store the results of a solver."""

    optimal_variables: dict[str, int | float | list] = Field(description="The optimal decision variables found.")
    optimal_objectives: dict[str, float | list[float]] = Field(
        description="The objective function values corresponding to the optimal decision variables found."
    )
    constraint_values: dict[str, float | int | list[float] | list] | None | Any = Field(
        description=(
            "The constraint values of the problem. A negative value means the constraint is respected, "
            "a positive one means it has been breached."
        ),
        default=None,
    )
    extra_func_values: dict[str, float | list[float]] | None = Field(
        description=("The extra function values of the problem."), default=None
    )
    scalarization_values: dict[str, float | list[float]] | None = Field(
        description=("The scalarization function values of the problem."), default=None
    )
    lagrange_multipliers: dict[str, float | list[float]] | None = Field(
        description="The Lagrange multipliers of the problem.", default=None
    )
    success: bool = Field(description="A boolean flag indicating whether the optimization was successful or not.")
    message: str = Field(description="Description of the cause of termination.")

SCORE Bands

Use the auto_SCORE function to generate the SCORE bands visualization.

This module contains the functions which generate SCORE bands visualizations. It also contains functions to calculate the order and positions of the objective axes, as well as a heatmap of correlation matrix.

To run the SCORE bands visualization, use the score_json function to generate the data for the visualization, and then use the plot_score function to generate the figure. You can also pass the result of score_json to other frontends for visualization.

CustomClusterOptions

Bases: BaseModel

Options for custom clustering provided by the user.

Source code in desdeo/tools/score_bands.py
class CustomClusterOptions(BaseModel):
    """Options for custom clustering provided by the user."""

    model_config = ConfigDict(use_attribute_docstrings=True)

    name: str = Field(default="Custom")
    """Custom user-provided clusters."""
    clusters: list[int]
    """List of cluster IDs (one for each solution) indicating the cluster to which each solution belongs."""

clusters instance-attribute

clusters: list[int]

List of cluster IDs (one for each solution) indicating the cluster to which each solution belongs.

name class-attribute instance-attribute

name: str = Field(default='Custom')

Custom user-provided clusters.

DBSCANOptions

Bases: BaseModel

Options for DBSCAN clustering algorithm.

Source code in desdeo/tools/score_bands.py
class DBSCANOptions(BaseModel):
    """Options for DBSCAN clustering algorithm."""

    model_config = ConfigDict(use_attribute_docstrings=True)

    name: str = Field(default="DBSCAN")
    """DBSCAN clustering algorithm."""

name class-attribute instance-attribute

name: str = Field(default='DBSCAN')

DBSCAN clustering algorithm.

DimensionClusterOptions

Bases: BaseModel

Options for clustering by one of the objectives/decision variables.

Source code in desdeo/tools/score_bands.py
class DimensionClusterOptions(BaseModel):
    """Options for clustering by one of the objectives/decision variables."""

    model_config = ConfigDict(use_attribute_docstrings=True)

    name: str = Field(default="DimensionCluster")
    """Clustering by one of the dimensions."""
    dimension_name: str
    """Dimension to use for clustering."""
    n_clusters: int = Field(default=5)
    """Number of clusters to use. Defaults to 5."""
    kind: Literal["EqualWidth", "EqualFrequency"] = Field(default="EqualWidth")
    """Kind of clustering to use. Either "EqualWidth", which divides the dimension range into equal width intervals,
        or "EqualFrequency", which divides the dimension values into intervals with equal number of solutions.
        Defaults to "EqualWidth"."""

dimension_name instance-attribute

dimension_name: str

Dimension to use for clustering.

kind class-attribute instance-attribute

kind: Literal["EqualWidth", "EqualFrequency"] = Field(
    default="EqualWidth"
)

Kind of clustering to use. Either "EqualWidth", which divides the dimension range into equal width intervals, or "EqualFrequency", which divides the dimension values into intervals with equal number of solutions. Defaults to "EqualWidth".

n_clusters class-attribute instance-attribute

n_clusters: int = Field(default=5)

Number of clusters to use. Defaults to 5.

name class-attribute instance-attribute

name: str = Field(default='DimensionCluster')

Clustering by one of the dimensions.

DistanceFormula

Bases: int, Enum

Distance formulas supported by SCORE bands. See the paper for details.

Source code in desdeo/tools/score_bands.py
class DistanceFormula(int, Enum):
    """Distance formulas supported by SCORE bands. See the paper for details."""

    FORMULA_1 = 1
    FORMULA_2 = 2

GMMOptions

Bases: BaseModel

Options for Gaussian Mixture Model clustering algorithm.

Source code in desdeo/tools/score_bands.py
class GMMOptions(BaseModel):
    """Options for Gaussian Mixture Model clustering algorithm."""

    model_config = ConfigDict(use_attribute_docstrings=True)

    name: str = Field(default="GMM")
    """Gaussian Mixture Model clustering algorithm."""
    scoring_method: Literal["BIC", "silhouette"] = Field(default="silhouette")
    """Scoring method to use for GMM. Either "BIC" or "silhouette". Defaults to "silhouette".
        This option determines how the number of clusters is chosen."""

name class-attribute instance-attribute

name: str = Field(default='GMM')

Gaussian Mixture Model clustering algorithm.

scoring_method class-attribute instance-attribute

scoring_method: Literal["BIC", "silhouette"] = Field(
    default="silhouette"
)

Scoring method to use for GMM. Either "BIC" or "silhouette". Defaults to "silhouette". This option determines how the number of clusters is chosen.

KMeansOptions

Bases: BaseModel

Options for KMeans clustering algorithm.

Source code in desdeo/tools/score_bands.py
class KMeansOptions(BaseModel):
    """Options for KMeans clustering algorithm."""

    model_config = ConfigDict(use_attribute_docstrings=True)

    name: str = Field(default="KMeans")
    """KMeans clustering algorithm."""
    n_clusters: int = Field(default=5)
    """Number of clusters to use. Defaults to 5."""

n_clusters class-attribute instance-attribute

n_clusters: int = Field(default=5)

Number of clusters to use. Defaults to 5.

name class-attribute instance-attribute

name: str = Field(default='KMeans')

KMeans clustering algorithm.

SCOREBandsConfig

Bases: BaseModel

Configuration options for SCORE bands visualization.

Source code in desdeo/tools/score_bands.py
class SCOREBandsConfig(BaseModel):
    """Configuration options for SCORE bands visualization."""

    model_config = ConfigDict(use_attribute_docstrings=True)

    dimensions: list[str] | None = Field(default=None)
    """List of variable/objective names (i.e., column names in the data) to include in the visualization.
        If None, all columns in the data are used. Defaults to None."""
    descriptive_names: dict[str, str] | None = Field(default=None)
    """Optional dictionary mapping dimensions to descriptive names for display in the visualization.
        If None, the original dimension names are used. Defaults to None."""
    units: dict[str, str] | None = Field(default=None)
    """Optional dictionary mapping dimensions to their units for display in the visualization.
        If None, no units are displayed. Defaults to None."""
    axis_positions: dict[str, float] | None = Field(default=None)
    """Dictionary mapping objective names to their positions on the axes in the SCORE bands visualization. The first
        objective is at position 0.0, and the last objective is at position 1.0. Use this option if you want to
        manually set the axis positions. If None, the axis positions are calculated automatically based on correlations.
        Defaults to None."""
    axis_colours: dict[str, str] | None = Field(default=None)
    """Optional dictionary to set the colour of the axes corresponding to each objective. The keys should be the
        same as in the 'dimensions' field. The values should be a valid plotly color string. Defaults to None.

        Valid plotly color strings include:
            - A hex string (e.g. '#ff0000')
            - An rgb/rgba string (e.g. 'rgb(255,0,0)')
            - An hsl/hsla string (e.g. 'hsl(0,100%,50%)')
            - An hsv/hsva string (e.g. 'hsv(0,100%,100%)')
            - A named CSS color: see https://plotly.com/python/css-colors/ for a list
    """
    highlight_cluster: int | None = Field(default=None)
    """Cluster ID to highlight in the visualization. If None, no cluster is highlighted. Defaults to None.
        If a cluster ID is provided, the corresponding cluster is highlighted in the visualization by having a
        pattern fill in the band.
    """
    clustering_algorithm: ClusteringOptions = Field(
        default=DBSCANOptions(),
    )
    """
    Clustering algorithm to use. Currently supports one of `ClusteringOptions`.
    """
    distance_formula: DistanceFormula = Field(default=DistanceFormula.FORMULA_1)
    """Distance formula to use. The value should be 1 or 2. Check the paper for details. Defaults to 1."""
    distance_parameter: float = Field(default=0.05)
    """Change the relative distances between the objective axes. Increase this value if objectives are placed too close
        together. Decrease this value if the objectives are equidistant in a problem with objective clusters. Defaults
        to 0.05."""
    use_absolute_correlations: bool = Field(default=False)
    """Whether to use absolute value of the correlation to calculate the placement of axes. Defaults to False."""
    include_solutions: bool = Field(default=False)
    """Whether to include individual solutions. Defaults to False. If True, the size of the resulting figure may be
        very large for datasets with many solutions. Moreover, the individual traces are hidden by default, but can be
        viewed interactively in the figure."""
    include_medians: bool = Field(default=False)
    """Whether to include cluster medians. Defaults to False. If True, the median traces are hidden by default, but
        can be viewed interactively in the figure."""
    interval_size: float = Field(default=0.95)
    """The size (as a fraction) of the interval to use for the bands. Defaults to 0.95, meaning that 95% of the
    middle solutions in a cluster will be included in the band. The rest will be considered outliers."""
    scales: dict[str, tuple[float, float]] | None = Field(default=None)
    """Optional dictionary specifying the min and max values for each objective. The keys should be the
        objective names (i.e., column names in the data), and the values should be tuples of (min, max).
        If not provided, the min and max will be calculated from the data."""

axis_colours class-attribute instance-attribute

axis_colours: dict[str, str] | None = Field(default=None)

Optional dictionary to set the colour of the axes corresponding to each objective. The keys should be the same as in the 'dimensions' field. The values should be a valid plotly color string. Defaults to None.

Valid plotly color strings include
  • A hex string (e.g. '#ff0000')
  • An rgb/rgba string (e.g. 'rgb(255,0,0)')
  • An hsl/hsla string (e.g. 'hsl(0,100%,50%)')
  • An hsv/hsva string (e.g. 'hsv(0,100%,100%)')
  • A named CSS color: see https://plotly.com/python/css-colors/ for a list

axis_positions class-attribute instance-attribute

axis_positions: dict[str, float] | None = Field(
    default=None
)

Dictionary mapping objective names to their positions on the axes in the SCORE bands visualization. The first objective is at position 0.0, and the last objective is at position 1.0. Use this option if you want to manually set the axis positions. If None, the axis positions are calculated automatically based on correlations. Defaults to None.

clustering_algorithm class-attribute instance-attribute

clustering_algorithm: ClusteringOptions = Field(
    default=DBSCANOptions()
)

Clustering algorithm to use. Currently supports one of ClusteringOptions.

descriptive_names class-attribute instance-attribute

descriptive_names: dict[str, str] | None = Field(
    default=None
)

Optional dictionary mapping dimensions to descriptive names for display in the visualization. If None, the original dimension names are used. Defaults to None.

dimensions class-attribute instance-attribute

dimensions: list[str] | None = Field(default=None)

List of variable/objective names (i.e., column names in the data) to include in the visualization. If None, all columns in the data are used. Defaults to None.

distance_formula class-attribute instance-attribute

distance_formula: DistanceFormula = Field(default=FORMULA_1)

Distance formula to use. The value should be 1 or 2. Check the paper for details. Defaults to 1.

distance_parameter class-attribute instance-attribute

distance_parameter: float = Field(default=0.05)

Change the relative distances between the objective axes. Increase this value if objectives are placed too close together. Decrease this value if the objectives are equidistant in a problem with objective clusters. Defaults to 0.05.

highlight_cluster class-attribute instance-attribute

highlight_cluster: int | None = Field(default=None)

Cluster ID to highlight in the visualization. If None, no cluster is highlighted. Defaults to None. If a cluster ID is provided, the corresponding cluster is highlighted in the visualization by having a pattern fill in the band.

include_medians class-attribute instance-attribute

include_medians: bool = Field(default=False)

Whether to include cluster medians. Defaults to False. If True, the median traces are hidden by default, but can be viewed interactively in the figure.

include_solutions class-attribute instance-attribute

include_solutions: bool = Field(default=False)

Whether to include individual solutions. Defaults to False. If True, the size of the resulting figure may be very large for datasets with many solutions. Moreover, the individual traces are hidden by default, but can be viewed interactively in the figure.

interval_size class-attribute instance-attribute

interval_size: float = Field(default=0.95)

The size (as a fraction) of the interval to use for the bands. Defaults to 0.95, meaning that 95% of the middle solutions in a cluster will be included in the band. The rest will be considered outliers.

scales class-attribute instance-attribute

scales: dict[str, tuple[float, float]] | None = Field(
    default=None
)

Optional dictionary specifying the min and max values for each objective. The keys should be the objective names (i.e., column names in the data), and the values should be tuples of (min, max). If not provided, the min and max will be calculated from the data.

units class-attribute instance-attribute

units: dict[str, str] | None = Field(default=None)

Optional dictionary mapping dimensions to their units for display in the visualization. If None, no units are displayed. Defaults to None.

use_absolute_correlations class-attribute instance-attribute

use_absolute_correlations: bool = Field(default=False)

Whether to use absolute value of the correlation to calculate the placement of axes. Defaults to False.

SCOREBandsResult

Bases: BaseModel

Pydantic/JSON model for representing SCORE Bands.

Source code in desdeo/tools/score_bands.py
class SCOREBandsResult(BaseModel):
    """Pydantic/JSON model for representing SCORE Bands."""

    model_config = ConfigDict(use_attribute_docstrings=True)

    options: SCOREBandsConfig
    """Configuration options used to generate the SCORE bands."""
    ordered_dimensions: list[str]
    """List of variable/objective names (i.e., column names in the data).
        Ordered according to their placement in the SCORE bands visualization."""
    clusters: list[int]
    """List of cluster IDs (one for each solution) indicating the cluster to which each solution belongs."""
    cluster_names: dict[int, str] | None = Field(default=None)
    """Optional dictionary mapping cluster IDs to descriptive names for display in the visualization.
        If None, the cluster IDs themselves are used as names. Defaults to None."""
    cluster_hover_info: dict[int, str] | None = Field(default=None)
    """Optional dictionary mapping cluster IDs to hover information for display in the visualization.
        If None, no additional hover information is displayed. Defaults to None."""
    axis_positions: dict[str, float]
    """Dictionary mapping objective names to their positions on the axes in the SCORE bands visualization. The first
        objective is at position 0.0, and the last objective is at position 1.0."""
    bands: dict[int, dict[str, tuple[float, float]]]
    """Dictionary mapping cluster IDs to dictionaries of objective names and their corresponding band
        extremes (min, max)."""
    medians: dict[int, dict[str, float]]
    """Dictionary mapping cluster IDs to dictionaries of objective names and their corresponding median values."""
    cardinalities: dict[int, int]
    """Dictionary mapping cluster IDs to the number of solutions in each cluster."""

axis_positions instance-attribute

axis_positions: dict[str, float]

Dictionary mapping objective names to their positions on the axes in the SCORE bands visualization. The first objective is at position 0.0, and the last objective is at position 1.0.

bands instance-attribute

bands: dict[int, dict[str, tuple[float, float]]]

Dictionary mapping cluster IDs to dictionaries of objective names and their corresponding band extremes (min, max).

cardinalities instance-attribute

cardinalities: dict[int, int]

Dictionary mapping cluster IDs to the number of solutions in each cluster.

cluster_hover_info class-attribute instance-attribute

cluster_hover_info: dict[int, str] | None = Field(
    default=None
)

Optional dictionary mapping cluster IDs to hover information for display in the visualization. If None, no additional hover information is displayed. Defaults to None.

cluster_names class-attribute instance-attribute

cluster_names: dict[int, str] | None = Field(default=None)

Optional dictionary mapping cluster IDs to descriptive names for display in the visualization. If None, the cluster IDs themselves are used as names. Defaults to None.

clusters instance-attribute

clusters: list[int]

List of cluster IDs (one for each solution) indicating the cluster to which each solution belongs.

medians instance-attribute

medians: dict[int, dict[str, float]]

Dictionary mapping cluster IDs to dictionaries of objective names and their corresponding median values.

options instance-attribute

options: SCOREBandsConfig

Configuration options used to generate the SCORE bands.

ordered_dimensions instance-attribute

ordered_dimensions: list[str]

List of variable/objective names (i.e., column names in the data). Ordered according to their placement in the SCORE bands visualization.

_DBSCANClustering

_DBSCANClustering(data: DataFrame) -> np.ndarray

Cluster the data using DBSCAN with silhouette scoring to choose eps.

Source code in desdeo/tools/score_bands.py
def _DBSCANClustering(data: pl.DataFrame) -> np.ndarray:  # noqa: N802
    """Cluster the data using DBSCAN with silhouette scoring to choose eps."""
    x = StandardScaler().fit_transform(data.to_numpy())
    eps_options = np.linspace(0.01, 1, 20)
    best_score = -np.inf
    best_labels = np.ones(len(data))
    for eps_option in eps_options:
        db = DBSCAN(eps=eps_option, min_samples=10, metric="cosine").fit(x)
        core_samples_mask = np.zeros_like(db.labels_, dtype=bool)
        core_samples_mask[db.core_sample_indices_] = True
        labels = db.labels_
        try:
            score = silhouette_score(x, labels, metric="cosine")
        except ValueError:
            score = -np.inf
        if score > best_score:
            best_score = score
            best_labels = labels
    # print((best_score, chosen_eps))
    return best_labels

_gaussianmixtureclusteringwithBIC

_gaussianmixtureclusteringwithBIC(
    data: DataFrame,
) -> np.ndarray

Cluster the data using Gaussian Mixture Model with BIC scoring.

Source code in desdeo/tools/score_bands.py
def _gaussianmixtureclusteringwithBIC(data: pl.DataFrame) -> np.ndarray:  # noqa: N802
    """Cluster the data using Gaussian Mixture Model with BIC scoring."""
    data_copy = data.to_numpy()
    data_copy = StandardScaler().fit_transform(data_copy)
    lowest_bic = np.inf
    bic = []
    n_components_range = range(1, min(11, len(data_copy)))
    cv_types: list[Literal["full", "tied", "diag", "spherical"]] = ["spherical", "tied", "diag", "full"]
    for cv_type in cv_types:
        for n_components in n_components_range:
            # Fit a Gaussian mixture with EM
            gmm = GaussianMixture(n_components=n_components, covariance_type=cv_type)
            gmm.fit(data_copy)
            bic.append(gmm.score(data_copy))
            # bic.append(gmm.bic(data))
            if bic[-1] < lowest_bic:
                lowest_bic = bic[-1]
                best_gmm = gmm

    return best_gmm.predict(data_copy)

_gaussianmixtureclusteringwithsilhouette

_gaussianmixtureclusteringwithsilhouette(
    data: DataFrame,
) -> np.ndarray

Cluster the data using Gaussian Mixture Model with silhouette scoring.

Source code in desdeo/tools/score_bands.py
def _gaussianmixtureclusteringwithsilhouette(data: pl.DataFrame) -> np.ndarray:
    """Cluster the data using Gaussian Mixture Model with silhouette scoring."""
    x = StandardScaler().fit_transform(data.to_numpy())
    best_score = -np.inf
    best_labels = np.ones(len(data))
    n_components_range = range(1, min(11, len(data)))
    cv_types: list[Literal["full", "tied", "diag", "spherical"]] = ["spherical", "tied", "diag", "full"]
    for cv_type in cv_types:
        for n_components in n_components_range:
            # Fit a Gaussian mixture with EM
            gmm = GaussianMixture(n_components=n_components, covariance_type=cv_type)
            labels = gmm.fit_predict(x)
            try:
                score = silhouette_score(x, labels, metric="cosine")
            except ValueError:
                score = -np.inf
            if score > best_score:
                best_score = score
                best_labels = labels
    # print(best_score)
    return best_labels

annotated_heatmap

annotated_heatmap(
    correlation_matrix: ndarray,
    col_names: list,
    order: list | ndarray,
) -> go.Figure

Create a heatmap of the correlation matrix. Probably should be named something else.

Parameters:

Name Type Description Default
correlation_matrix ndarray

2-D square array of correlation values between pairs of objectives.

required
col_names List

Objective names.

required
order Union[List, ndarray]

Order in which the objectives are shown in SCORE bands.

required

Returns:

Type Description
Figure

go.Figure: The heatmap

Source code in desdeo/tools/score_bands.py
def annotated_heatmap(correlation_matrix: np.ndarray, col_names: list, order: list | np.ndarray) -> go.Figure:
    """Create a heatmap of the correlation matrix. Probably should be named something else.

    Args:
        correlation_matrix (np.ndarray): 2-D square array of correlation values between pairs of objectives.
        col_names (List): Objective names.
        order (Union[List, np.ndarray]): Order in which the objectives are shown in SCORE bands.

    Returns:
        go.Figure: The heatmap
    """
    corr = pl.DataFrame(correlation_matrix, index=col_names, columns=col_names)
    corr = corr[col_names[order]].loc[col_names[order[::-1]]]
    corr = np.rint(corr * 100) / 100  # Take upto two significant figures only to make heatmap readable.
    fig = ff.create_annotated_heatmap(
        corr.to_numpy(),
        x=list(corr.columns),
        y=list(corr.index),
        annotation_text=corr.astype(str).to_numpy(),
    )
    fig.update_layout(title="Pearson correlation coefficients")
    return fig

calculate_axes_positions

calculate_axes_positions(
    dimension_order: list[int],
    corr: ndarray,
    dist_parameter: float,
    distance_formula: DistanceFormula = DistanceFormula.FORMULA_1,
) -> np.ndarray

Calculate the position of the axes for the SCORE bands visualization based on correlations.

Parameters:

Name Type Description Default
dimension_order list[int]

Order of the variables to be plotted.

required
corr ndarray

Correlation (pearson) matrix.

required
dist_parameter float

Change the relative distances between the axes. Increase this value if the axes are placed too close together. Decrease this value if the axes are equidistant.

required
distance_formula DistanceFormula

The value should be 1 or 2. Check the paper for details. Defaults to DistanceFormula.FORMULA_1.

FORMULA_1

Returns:

Type Description
ndarray

np.ndarray: Positions of the axes in the range [0, 1].

Source code in desdeo/tools/score_bands.py
def calculate_axes_positions(
    dimension_order: list[int],
    corr: np.ndarray,
    dist_parameter: float,
    distance_formula: DistanceFormula = DistanceFormula.FORMULA_1,
) -> np.ndarray:
    """Calculate the position of the axes for the SCORE bands visualization based on correlations.

    Args:
        dimension_order (list[int]): Order of the variables to be plotted.
        corr (np.ndarray): Correlation (pearson) matrix.
        dist_parameter (float): Change the relative distances between the axes. Increase this value if the axes are
            placed too close together. Decrease this value if the axes are equidistant.
        distance_formula (DistanceFormula, optional): The value should be 1 or 2. Check the paper for details.
            Defaults to DistanceFormula.FORMULA_1.

    Returns:
        np.ndarray: Positions of the axes in the range [0, 1].
    """
    # axes positions
    order = np.asarray(list(itertools.pairwise(dimension_order)))
    axis_len = corr[order[:, 0], order[:, 1]]
    if distance_formula == DistanceFormula.FORMULA_1:
        axis_len = 1 - axis_len
    elif distance_formula == DistanceFormula.FORMULA_2:
        axis_len = 1 / (np.abs(axis_len) + 1)  #  Reciprocal for reverse
    else:
        # Should never reach here
        raise ValueError("distance_formula should be either 1 or 2 (int)")
    axis_len = axis_len + dist_parameter
    axis_len = axis_len / sum(axis_len)
    return np.cumsum(np.append(0, axis_len))

cluster

cluster(
    data: DataFrame, options: ClusteringOptions
) -> np.ndarray

Cluster the data using the specified clustering algorithm and options.

Source code in desdeo/tools/score_bands.py
def cluster(data: pl.DataFrame, options: ClusteringOptions) -> np.ndarray:
    """Cluster the data using the specified clustering algorithm and options."""
    if isinstance(options, DimensionClusterOptions):
        return cluster_by_dimension(data, options)
    if isinstance(options, KMeansOptions):
        x = StandardScaler().fit_transform(data.to_numpy())
        kmeans = KMeans(n_clusters=options.n_clusters, random_state=0).fit(x)
        return kmeans.labels_
    if isinstance(options, DBSCANOptions):
        return _DBSCANClustering(data)
    if isinstance(options, GMMOptions):
        if options.scoring_method == "silhouette":
            return _gaussianmixtureclusteringwithsilhouette(data)
        if options.scoring_method == "BIC":
            return _gaussianmixtureclusteringwithBIC(data)
    if isinstance(options, CustomClusterOptions):
        if len(options.clusters) != len(data):
            raise ValueError("Length of custom clusters must match number of solutions in data.")
        return np.array(options.clusters)
    raise ValueError(f"Unknown clustering algorithm: {options}")

cluster_by_dimension

cluster_by_dimension(
    data: DataFrame, options: DimensionClusterOptions
) -> np.ndarray

Cluster the data by a specific dimension.

Source code in desdeo/tools/score_bands.py
def cluster_by_dimension(data: pl.DataFrame, options: DimensionClusterOptions) -> np.ndarray:
    """Cluster the data by a specific dimension."""
    if options.dimension_name not in data.columns:
        raise ValueError(f"Objective '{options.dimension_name}' not found in data.")

    # Select the dimension column for clustering
    dimension = data[options.dimension_name]

    # Perform clustering based on the specified method
    if options.kind == "EqualWidth":
        min_val: float = dimension.min()
        max_val: float = dimension.max()
        SMALL_VALUE = 1e-8  # noqa: N806
        thresholds = np.linspace(
            min_val * (1 - SMALL_VALUE),  # Ensure the minimum value is included in the first cluster
            max_val * (1 + SMALL_VALUE),  # Ensure the maximum value is included in the last cluster
            options.n_clusters + 1,
        )
        return np.digitize(dimension.to_numpy(), thresholds)  # Cluster IDs start at 1
    if options.kind == "EqualFrequency":
        levels: list[float] = [dimension.quantile(i / options.n_clusters) for i in range(1, options.n_clusters)]
        thresholds = [-np.inf, *levels, np.inf]
        return np.digitize(dimension.to_numpy(), thresholds)  # Cluster IDs start at 1
    raise ValueError(f"Unknown clustering kind: {options.kind}")

order_dimensions

order_dimensions(
    data: DataFrame, use_absolute_corr: bool = False
) -> tuple[np.ndarray, list[int]]

Calculate the order of objectives.

Also returns the correlation matrix.

Parameters:

Name Type Description Default
data DataFrame

Data to be visualized.

required
use_absolute_corr bool

Use absolute value of the correlation to calculate order. Defaults to False.

False

Returns:

Name Type Description
tuple tuple[ndarray, list[int]]

The first element is the correlation matrix. The second element is the order of the objectives.

Source code in desdeo/tools/score_bands.py
def order_dimensions(data: pl.DataFrame, use_absolute_corr: bool = False) -> tuple[np.ndarray, list[int]]:
    """Calculate the order of objectives.

    Also returns the correlation matrix.

    Args:
        data (pl.DataFrame): Data to be visualized.
        use_absolute_corr (bool, optional): Use absolute value of the correlation to calculate order. Defaults to False.

    Returns:
        tuple: The first element is the correlation matrix. The second element is the order of the objectives.
    """
    # Calculating correlations
    # corr = spearmanr(data).correlation  # Pearson's coeff is better than Spearmann's, in some cases
    corr = np.asarray(
        [
            [pearsonr(data.to_numpy()[:, i], data.to_numpy()[:, j])[0] for j in range(len(data.columns))]
            for i in range(len(data.columns))
        ]
    )
    # axes order: solving TSP
    distances = corr
    if use_absolute_corr:
        distances = np.abs(distances)
    obj_order = solve_tsp(-distances)
    return corr, obj_order

plot_score

plot_score(
    data: DataFrame, result: SCOREBandsResult
) -> go.Figure

Generate the SCORE Bands figure from the SCOREBandsResult data.

Parameters:

Name Type Description Default
data DataFrame

Dataframe of objective values. The column names should be the objective names. Each row should be an objective vector.

required
result SCOREBandsResult

The result containing all relevant data for the SCORE bands visualization.

required

Returns:

Type Description
Figure

go.Figure: The SCORE bands plot.

Source code in desdeo/tools/score_bands.py
def plot_score(data: pl.DataFrame, result: SCOREBandsResult) -> go.Figure:
    """Generate the SCORE Bands figure from the SCOREBandsResult data.

    Args:
        data (pl.DataFrame): Dataframe of objective values. The column names should be the objective names. Each row
            should be an objective vector.
        result (SCOREBandsResult): The result containing all relevant data for the SCORE bands visualization.

    Returns:
        go.Figure: The SCORE bands plot.
    """
    column_names = result.ordered_dimensions

    clusters = np.sort(np.unique(result.clusters))

    cluster_th = 8  # max number of clusters to use 'Accent' color map with, otherwise use 'tab20'
    colorscale = (
        cm.get_cmap("Accent", len(clusters)) if len(clusters) <= cluster_th else cm.get_cmap("tab20", len(clusters))
    )
    if result.options.scales is None:
        raise ValueError("Scales must be provided in the SCOREBandsResult to plot the figure.")

    # Original scaling (not used in final version, but keeping for reference)
    # scaled_data = data.select((pl.all() - pl.all().min()) / (pl.all().max() - pl.all().min()))
    # Scaling with respect to the provided scales, which may not aling with the actual min and max in the data
    # (e.g. if user wants to use fixed scales across multiple visualizations)
    scaled_data = data.with_columns(
        [
            (pl.col(col) - result.options.scales[col][0])
            / (result.options.scales[col][1] - result.options.scales[col][0])
            for col in column_names
        ]
    )
    fig = go.Figure()
    fig.update_xaxes(showticklabels=False, showgrid=False, zeroline=False)
    fig.update_yaxes(showticklabels=False, showgrid=False, zeroline=False)
    fig.update_layout(plot_bgcolor="rgba(0,0,0,0)")

    cluster_column_name = "cluster"
    # Avoid overwriting existing column just in case data has a 'cluster' column
    if cluster_column_name in scaled_data.columns:
        cluster_column_name = "cluster_id"
    scaled_data = scaled_data.with_columns(pl.Series(cluster_column_name, result.clusters))

    if result.options.descriptive_names is None:
        descriptive_names = {name: name for name in column_names}
    else:
        descriptive_names = result.options.descriptive_names
    units = dict.fromkeys(column_names, "") if result.options.units is None else result.options.units

    num_ticks = 6
    # Add axes
    for i, col_name in enumerate(column_names):
        # check if axis_colours is provided, otherwise use black
        current_axis_colour = "black"
        if result.options.axis_colours is not None and col_name in result.options.axis_colours:
            current_axis_colour = result.options.axis_colours[col_name]
        label_text = np.linspace(result.options.scales[col_name][0], result.options.scales[col_name][1], num_ticks)
        label_text = [f"{i:.5g}" for i in label_text]
        # label_text[0] = "<<"
        # label_text[-1] = ">>"
        heights = np.linspace(0, 1, num_ticks)
        # Axis lines
        fig.add_scatter(
            x=[result.axis_positions[col_name]] * num_ticks,
            y=heights,
            text=label_text,
            textposition="middle left",
            mode="markers+lines+text",
            line={"color": current_axis_colour},
            showlegend=False,
            hoverinfo="skip",
        )
        # Column Name
        fig.add_scatter(
            x=[result.axis_positions[col_name]],
            y=[1.20],
            text=f"{descriptive_names[col_name]}",
            textfont={"size": 20},
            mode="text",
            showlegend=False,
        )
        # Units
        fig.add_scatter(
            x=[result.axis_positions[col_name]],
            y=[1.10],
            text=f"{units[col_name]}",
            textfont={"size": 12},
            mode="text",
            showlegend=False,
        )
    # Add bands
    for cluster_id in sorted(result.bands.keys()):
        r, g, b, a = colorscale(cluster_id - 1)  # Needed as cluster numbering starts at 1
        a = 0.6
        highlight = None
        if result.options.highlight_cluster is not None and cluster_id == result.options.highlight_cluster:
            highlight = {"shape": "x"}
        hovertext = (
            result.cluster_hover_info.get(cluster_id, f"Cluster {cluster_id}")
            if result.cluster_hover_info is not None
            else f"Cluster {cluster_id}"
        )
        color_bands = f"rgba({r}, {g}, {b}, {a})"
        color_solutions = f"rgba({r}, {g}, {b}, 0.3)"
        # color_soln = f"rgba({r}, {g}, {b}, {a})"

        lows = [
            (result.bands[cluster_id][col_name][0] - result.options.scales[col_name][0])
            / (result.options.scales[col_name][1] - result.options.scales[col_name][0])
            for col_name in column_names
        ]
        highs = [
            (result.bands[cluster_id][col_name][1] - result.options.scales[col_name][0])
            / (result.options.scales[col_name][1] - result.options.scales[col_name][0])
            for col_name in column_names
        ]
        medians = [
            (result.medians[cluster_id][col_name] - result.options.scales[col_name][0])
            / (result.options.scales[col_name][1] - result.options.scales[col_name][0])
            for col_name in column_names
        ]

        current_cluster_name = "Cluster " + str(cluster_id)
        if result.cluster_names is not None and cluster_id in result.cluster_names:
            current_cluster_name = result.cluster_names[cluster_id]
        fig.add_scatter(
            x=[result.axis_positions[col_name] for col_name in column_names],
            y=lows,
            line={"color": color_bands},
            name=f"{int(100 * result.options.interval_size)}% band: {current_cluster_name}",
            mode="lines",
            legendgroup=f"{int(100 * result.options.interval_size)}% band: {current_cluster_name}",
            showlegend=True,
            line_shape="spline",
            hovertext=hovertext,
            hoverinfo="text",
        )
        # upper bound of the band
        fig.add_scatter(
            x=[result.axis_positions[col_name] for col_name in column_names],
            y=highs,
            line={"color": color_bands},
            name=f"{current_cluster_name}",
            fillcolor=color_bands,
            mode="lines",
            fillpattern=go.scatter.Fillpattern(highlight),
            legendgroup=f"{int(100 * result.options.interval_size)}% band: {current_cluster_name}",
            showlegend=False,
            line_shape="spline",
            fill="tonexty",
            hovertext=hovertext,
            hoverinfo="text",
        )

        if result.options.include_medians:
            # median
            fig.add_scatter(
                x=[result.axis_positions[col_name] for col_name in column_names],
                y=medians,
                line={"color": color_bands},
                name=f"Median: {current_cluster_name}",
                mode="lines+markers",
                marker={"line": {"color": "Black", "width": 2}},
                legendgroup=f"Median: {current_cluster_name}",
                showlegend=True,
            )
        # Drawing each solution as a single trace (like how it was done in the past) can make the figure very heavy
        # and make interactions (showing/hiding traces) very laggy.
        # Thus here, we draw all solutions in a cluster as a single trace. Basically we make a huge zig-zag trace for
        # each cluster. E.g. imagine two solutions with obj vals (1, 2, 3) and (4, 5, 6) on three objectives. These
        # become the y values in the parallel coordinates plot in this order (1, 2, 3, 6, 5, 4). Subsets of these points
        # which belong to the same solution are given the same hovertext
        if result.options.include_solutions:
            cluster_solutions = scaled_data.filter(pl.col(cluster_column_name) == cluster_id).select(column_names)

            x = []
            y = []
            ax_pos = [result.axis_positions[col_name] for col_name in column_names]
            hovertexts = []
            rev_ax_pos = ax_pos[::-1]
            for i, row in enumerate(cluster_solutions.iter_rows()):
                if i % 2 == 0:
                    x = x + ax_pos
                    y = y + list(row)
                else:
                    x = x + rev_ax_pos
                    y = y + list(row)[::-1]
                # Scale values back to original scale for hovertext
                hovertext = [
                    (
                        f"<b>{col}</b>: "
                        f"{
                            (
                                val * (result.options.scales[col][1] - result.options.scales[col][0])
                                + result.options.scales[col][0]
                            ):.2f} <br>"
                    )
                    for col, val in zip(column_names, row, strict=True)
                ]
                hovertext = "".join(hovertext)
                hovertext = [hovertext] * len(column_names)
                hovertexts = hovertexts + hovertext
            fig.add_scatter(
                x=x,
                y=y,
                line={"color": color_solutions},
                name=f"{result.cardinalities[cluster_id]} Solutions: {current_cluster_name}",
                mode="lines",
                legendgroup=f"{result.cardinalities[cluster_id]} Solutions: {current_cluster_name}",
                hovertext=hovertexts,
                hoverinfo="text",
                hoveron="points+fills",
                showlegend=True,
            )
    fig.update_layout(font_size=18)
    fig.update_layout(legend={"orientation": "h", "yanchor": "top"})
    return fig

score_json

score_json(
    data: DataFrame, options: SCOREBandsConfig
) -> SCOREBandsResult

Generate the SCORE Bands data for a given dataset and configuration options.

Parameters:

Name Type Description Default
data DataFrame

Dataframe of variable (decision or objective) values. The column names should be the names of the variables to be plotted. Each row should be a solution.

required
options SCOREBandsConfig

Configuration options for generating the SCORE bands.

required

Returns:

Name Type Description
SCOREBandsResult SCOREBandsResult

The result containing all relevant data for the SCORE bands visualization.

Source code in desdeo/tools/score_bands.py
def score_json(
    data: pl.DataFrame,
    options: SCOREBandsConfig,
) -> SCOREBandsResult:
    """Generate the SCORE Bands data for a given dataset and configuration options.

    Args:
        data (pl.DataFrame): Dataframe of variable (decision or objective) values.
            The column names should be the names of the variables to be plotted. Each row should be a solution.

        options (SCOREBandsConfig): Configuration options for generating the SCORE bands.

    Returns:
        SCOREBandsResult: The result containing all relevant data for the SCORE bands visualization.
    """
    options = deepcopy(options)
    # Calculating correlations and axes positions
    if options.dimensions is None:
        options.dimensions = data.columns
    data_copy = data.select([pl.col(col) for col in options.dimensions])

    if options.axis_positions is None:
        corr, dimension_order = order_dimensions(data_copy, use_absolute_corr=options.use_absolute_correlations)

        axis_dist = calculate_axes_positions(
            dimension_order,
            corr,
            dist_parameter=options.distance_parameter,
            distance_formula=options.distance_formula,
        )

        ordered_dimension_names = [data_copy.columns[i] for i in dimension_order]
        axis_positions = {name: axis_dist[i] for i, name in enumerate(ordered_dimension_names)}
    else:
        axis_positions = options.axis_positions
        ordered_dimension_names = sorted(axis_positions.keys(), key=axis_positions.get)

    clusters = cluster(data_copy, options.clustering_algorithm)

    if min(clusters) <= 0:
        clusters = clusters - np.min(clusters) + 1  # translate minimum to 1.

    # some sanity check: check if all cluster IDs are contiguous integers starting at 1, ending at number of clusters
    unique_clusters = np.unique(clusters)
    max_cluster_id = max(clusters)
    if not all(i in unique_clusters for i in range(1, max_cluster_id + 1)):
        warn(
            """Cluster IDs are not contiguous integers starting at 1.
            This may cause issues with the color mapping in the visualization.""",
            category=UserWarning,
            stacklevel=2,
        )

    cluster_column_name = "cluster"
    if cluster_column_name in data_copy.columns:
        cluster_column_name = "cluster_id"

    data_copy = data_copy.with_columns(pl.Series(cluster_column_name, clusters))
    grouped = data_copy.group_by(cluster_column_name)
    min_percentile = (1 - options.interval_size) / 2
    max_percentile = 1 - min_percentile
    mins = grouped.quantile(min_percentile)
    maxs = grouped.quantile(max_percentile)
    medians = grouped.median()
    frequencies = grouped.len()
    bands_dict = {
        cluster_id: {
            col_name: (
                mins.filter(pl.col(cluster_column_name) == cluster_id)[col_name][0],
                maxs.filter(pl.col(cluster_column_name) == cluster_id)[col_name][0],
            )
            for col_name in ordered_dimension_names
        }
        for cluster_id in mins[cluster_column_name].to_list()
    }
    medians_dict = {
        cluster_id: {
            col_name: medians.filter(pl.col(cluster_column_name) == cluster_id)[col_name][0]
            for col_name in ordered_dimension_names
        }
        for cluster_id in medians[cluster_column_name].to_list()
    }
    frequencies_dict = {
        cluster_id: frequencies.filter(pl.col(cluster_column_name) == cluster_id)["len"][0]
        for cluster_id in frequencies[cluster_column_name].to_list()
    }

    if options.scales is None:
        scales: dict[str, tuple[float, float]] = {
            dimension: (data_copy[dimension].min(), data_copy[dimension].max()) for dimension in ordered_dimension_names
        }
        options.scales = scales
    return SCOREBandsResult(
        options=options,
        ordered_dimensions=ordered_dimension_names,
        clusters=clusters.tolist(),
        axis_positions=axis_positions,
        bands=bands_dict,
        medians=medians_dict,
        cardinalities=frequencies_dict,
    )