Skip to content

mcdm

desdeo.mcdm.nimbus

Functions related to the Sychronous NIMBUS method.

References

Miettinen, K., & Mäkelä, M. M. (2006). Synchronous approach in interactive multiobjective optimization. European Journal of Operational Research, 170(3), 909–922.

NimbusError

Bases: Exception

Raised when an error with a NIMBUS method is encountered.

Source code in desdeo/mcdm/nimbus.py
class NimbusError(Exception):
    """Raised when an error with a NIMBUS method is encountered."""

generate_starting_point

generate_starting_point(
    problem: Problem,
    reference_point: dict[str, float] | None = None,
    scalarization_options: dict | None = None,
    solver: BaseSolver | None = None,
    solver_options: SolverOptions | None = None,
) -> SolverResults

Generates a starting point for the NIMBUS method.

Using the given reference point and achievement scalarizing function, finds one pareto optimal solution that can be used as a starting point for the NIMBUS method. If no reference point is given, ideal is used as the reference point.

Instead of using this function, the user can provide a starting point.

Raises:

Type Description
NimbusError

the given problem has an undefined ideal or nadir point, or both.

Parameters:

Name Type Description Default
problem Problem

the problem being solved.

required
reference_point dict[str, float] | None

an objective dictionary with a reference point. If not given, ideal will be used as reference point.

None
scalarization_options dict | None

optional kwargs passed to the scalarization function. Defaults to None.

None
solver BaseSolver | None

solver used to solve the problem. If not given, an appropriate solver will be automatically determined based on the features of problem. Defaults to None.

None
solver_options SolverOptions | None

optional options passed to the solver. Ignored if solver is None. Defaults to None.

None

Returns:

Type Description
SolverResults

list[SolverResults]: a list of SolverResults objects. Contains as many elements as defined in num_desired.

Source code in desdeo/mcdm/nimbus.py
def generate_starting_point(
    problem: Problem,
    reference_point: dict[str, float] | None = None,
    scalarization_options: dict | None = None,
    solver: BaseSolver | None = None,
    solver_options: SolverOptions | None = None,
) -> SolverResults:
    r"""Generates a starting point for the NIMBUS method.

    Using the given reference point and achievement scalarizing function, finds one pareto
    optimal solution that can be used as a starting point for the NIMBUS method.
    If no reference point is given, ideal is used as the reference point.

    Instead of using this function, the user can provide a starting point.

    Raises:
        NimbusError: the given problem has an undefined ideal or nadir point, or both.

    Args:
        problem (Problem): the problem being solved.
        reference_point (dict[str, float]|None): an objective dictionary with a reference point.
            If not given, ideal will be used as reference point.
        scalarization_options (dict | None, optional): optional kwargs passed to the scalarization function.
            Defaults to None.
        solver (BaseSolver | None, optional): solver used to solve the problem.
            If not given, an appropriate solver will be automatically determined based on the features of `problem`.
            Defaults to None.
        solver_options (SolverOptions | None, optional): optional options passed
            to the `solver`. Ignored if `solver` is `None`.
            Defaults to None.

    Returns:
        list[SolverResults]: a list of `SolverResults` objects. Contains as many elements
            as defined in `num_desired`.
    """
    ideal = problem.get_ideal_point()
    nadir = problem.get_nadir_point()
    if None in ideal or None in nadir:
        msg = "The given problem must have both an ideal and nadir point defined."
        raise NimbusError(msg)

    if reference_point is None:
        reference_point = {}
    for obj in problem.objectives:
        if obj.symbol not in reference_point:
            reference_point[obj.symbol] = ideal[obj.symbol]

    init_solver = solver if solver is not None else guess_best_solver(problem)
    _solver_options = solver_options if solver_options is not None else None

    # solve ASF
    add_asf = add_asf_diff if problem.is_twice_differentiable else add_asf_nondiff

    problem_w_asf, asf_target = add_asf(problem, "asf", reference_point, **(scalarization_options or {}))

    asf_solver = init_solver(problem_w_asf, _solver_options) if _solver_options else init_solver(problem_w_asf)

    return asf_solver.solve(asf_target)

infer_classifications

infer_classifications(
    problem: Problem,
    current_objectives: dict[str, float],
    reference_point: dict[str, float],
) -> dict[str, tuple[str, float | None]]

Infers NIMBUS classifications based on a reference point and current objective values.

Infers the classifications based on a given reference point and current objective function values. The following classifications are inferred for each objective:

  • \(I^{<}\): values that should improve, the reference point value of an objective function is equal to its ideal value;
  • \(I^{\leq}\): values that should improve until a given aspiration level, the reference point value of an objective function is better than the current value;
  • \(I^{=}\): values that should stay as they are, the reference point value of an objective function is equal to the current value;
  • \(I^{\geq}\): values that can be impaired until some reservation level, the reference point value of an objective function is worse than the current value; and
  • \(I^{\diamond}\): values that are allowed to change freely, the reference point value of and objective function is equal to its nadir value.

The aspiration levels and the reservation levels are then given for each classification, when relevant, in the return value of this function as the following example demonstrates:

classifications = {
    "f_1": ("<", None),
    "f_2": ("<=", 42.1),
    "f_3": (">=", 22.2),
    "f_4": ("0", None)
    }

Raises:

Type Description
NimbusError

the ideal or nadir point, or both, of the given problem are undefined.

NimbusError

the reference point or current objectives are missing entries for one or more of the objective functions defined in the problem.

Parameters:

Name Type Description Default
problem Problem

the problem the current objectives and the reference point are related to.

required
current_objectives dict[str, float]

an objective dictionary with the current objective functions values.

required
reference_point dict[str, float]

an objective dictionary with the reference point values.

required

Returns:

Type Description
dict[str, tuple[str, float | None]]

dict[str, tuple[str, float | None]]: a dict with keys corresponding to the symbols of the objective functions defined for the problem and with values of tuples, where the first element is the classification (str) and the second element is either a reservation level (in case of classification >=) or an aspiration level (in case of classification <=).

Source code in desdeo/mcdm/nimbus.py
def infer_classifications(
    problem: Problem, current_objectives: dict[str, float], reference_point: dict[str, float]
) -> dict[str, tuple[str, float | None]]:
    r"""Infers NIMBUS classifications based on a reference point and current objective values.

    Infers the classifications based on a given reference point and current objective function
    values. The following classifications are inferred for each objective:

    - $I^{<}$: values that should improve, the reference point value of an objective
        function is equal to its ideal value;
    - $I^{\leq}$: values that should improve until a given aspiration level, the reference point
        value of an objective function is better than the current value;
    - $I^{=}$: values that should stay as they are, the reference point value of an objective
        function is equal to the current value;
    - $I^{\geq}$: values that can be impaired until some reservation level, the reference point
        value of an objective function is worse than the current value; and
    - $I^{\diamond}$: values that are allowed to change freely, the reference point value of
        and objective function is equal to its nadir value.

    The aspiration levels and the reservation levels are then given for each classification, when relevant, in
    the return value of this function as the following example demonstrates:

    ```python
    classifications = {
        "f_1": ("<", None),
        "f_2": ("<=", 42.1),
        "f_3": (">=", 22.2),
        "f_4": ("0", None)
        }
    ```

    Raises:
        NimbusError: the ideal or nadir point, or both, of the given
            problem are undefined.
        NimbusError: the reference point or current objectives are missing
            entries for one or more of the objective functions defined in
            the problem.

    Args:
        problem (Problem): the problem the current objectives and the reference point
            are related to.
        current_objectives (dict[str, float]): an objective dictionary with the current
            objective functions values.
        reference_point (dict[str, float]): an objective dictionary with the reference point
            values.

    Returns:
        dict[str, tuple[str, float | None]]: a dict with keys corresponding to the
            symbols of the objective functions defined for the problem and with values
            of tuples, where the first element is the classification (str) and the second
            element is either a reservation level (in case of classification `>=`) or an
            aspiration level (in case of classification `<=`).
    """
    if None in problem.get_ideal_point() or None in problem.get_nadir_point():
        msg = "The given problem must have both an ideal and nadir point defined."
        raise NimbusError(msg)

    if not all(obj.symbol in reference_point for obj in problem.objectives):
        msg = f"The reference point {reference_point} is missing entries " "for one or more of the objective functions."
        raise NimbusError(msg)

    if not all(obj.symbol in current_objectives for obj in problem.objectives):
        msg = (
            f"The current point {current_objectives} is missing entries " "for one or more of the objective functions."
        )
        raise NimbusError(msg)

    # derive the classifications based on the reference point and and previous
    # objective function values
    classifications = {}

    for obj in problem.objectives:
        if np.isclose(reference_point[obj.symbol], obj.nadir):
            # the objective is free to change
            classification = {obj.symbol: ("0", None)}
        elif np.isclose(reference_point[obj.symbol], obj.ideal):
            # the objective should improve
            classification = {obj.symbol: ("<", None)}
        elif np.isclose(reference_point[obj.symbol], current_objectives[obj.symbol]):
            # the objective should stay as it is
            classification = {obj.symbol: ("=", None)}
        elif not obj.maximize and reference_point[obj.symbol] < current_objectives[obj.symbol]:
            # minimizing objective, reference value smaller, this is an aspiration level
            # improve until
            classification = {obj.symbol: ("<=", reference_point[obj.symbol])}
        elif not obj.maximize and reference_point[obj.symbol] > current_objectives[obj.symbol]:
            # minimizing objective, reference value is greater, this is a reservations level
            # impair until
            classification = {obj.symbol: (">=", reference_point[obj.symbol])}
        elif obj.maximize and reference_point[obj.symbol] < current_objectives[obj.symbol]:
            # maximizing objective, reference value is smaller, this is a reservation level
            # impair until
            classification = {obj.symbol: (">=", reference_point[obj.symbol])}
        elif obj.maximize and reference_point[obj.symbol] > current_objectives[obj.symbol]:
            # maximizing objective, reference value is greater, this is an aspiration level
            # improve until
            classification = {obj.symbol: ("<=", reference_point[obj.symbol])}
        else:
            # could not figure classification
            msg = f"Warning: NIMBUS could not figure out the classification for objective {obj.symbol}."

        classifications |= classification

    return classifications

solve_intermediate_solutions

solve_intermediate_solutions(
    problem: Problem,
    solution_1: dict[str, VariableType],
    solution_2: dict[str, VariableType],
    num_desired: int,
    scalarization_options: dict | None = None,
    solver: BaseSolver | None = None,
    solver_options: SolverOptions | None = None,
) -> list[SolverResults]

Generates a desired number of intermediate solutions between two given solutions.

Generates a desires number of intermediate solutions given two Pareto optimal solutions. The solutions are generated by taking n number of steps between the two solutions in the objective space. The objective vectors corresponding to these solutions are then utilized as reference points in the achievement scalarizing function. Solving the functions for each reference point will project the reference point on the Pareto optimal front of the problem. These projected solutions are then returned. Note that the intermediate solutions are generated between the two given solutions, this means the returned solutions will not include the original points.

Parameters:

Name Type Description Default
problem Problem

the problem being solved.

required
solution_1 dict[str, VariableType]

the first of the solutions between which the intermediate solutions are to be generated.

required
solution_2 dict[str, VariableType]

the second of the solutions between which the intermediate solutions are to be generated.

required
num_desired int

the number of desired intermediate solutions to be generated. Must be at least 1.

required
scalarization_options dict | None

optional kwargs passed to the scalarization function. Defaults to None.

None
solver BaseSolver | None

solver used to solve the problem. If not given, an appropriate solver will be automatically determined based on the features of problem. Defaults to None.

None
solver_options SolverOptions | None

optional options passed to the solver. Ignored if solver is None. Defaults to None.

None

Returns:

Type Description
list[SolverResults]

list[SolverResults]: a list with the projected intermediate solutions as SolverResults objects.

Source code in desdeo/mcdm/nimbus.py
def solve_intermediate_solutions(  # noqa: PLR0913
    problem: Problem,
    solution_1: dict[str, VariableType],
    solution_2: dict[str, VariableType],
    num_desired: int,
    scalarization_options: dict | None = None,
    solver: BaseSolver | None = None,
    solver_options: SolverOptions | None = None,
) -> list[SolverResults]:
    """Generates a desired number of intermediate solutions between two given solutions.

    Generates a desires number of intermediate solutions given two Pareto optimal solutions.
    The solutions are generated by taking n number of steps between the two solutions in the
    objective space. The objective vectors corresponding to these solutions are then
    utilized as reference points in the achievement scalarizing function. Solving the functions
    for each reference point will project the reference point on the Pareto optimal
    front of the problem. These projected solutions are then returned. Note that the
    intermediate solutions are generated _between_ the two given solutions, this means the
    returned solutions will not include the original points.

    Args:
        problem (Problem): the problem being solved.
        solution_1 (dict[str, VariableType]): the first of the solutions between which the intermediate
            solutions are to be generated.
        solution_2 (dict[str, VariableType]): the second of the solutions between which the intermediate
            solutions are to be generated.
        num_desired (int): the number of desired intermediate solutions to be generated. Must be at least `1`.
        scalarization_options (dict | None, optional): optional kwargs passed to the scalarization function.
            Defaults to None.
        solver (BaseSolver | None, optional): solver used to solve the problem.
            If not given, an appropriate solver will be automatically determined based on the features of `problem`.
            Defaults to None.
        solver_options (SolverOptions | None, optional): optional options passed
            to the `solver`. Ignored if `solver` is `None`.
            Defaults to None.

    Returns:
        list[SolverResults]: a list with the projected intermediate solutions as
            `SolverResults` objects.
    """
    if int(num_desired) < 1:
        msg = f"The given number of desired intermediate ({num_desired=}) solutions must be at least 1."
        raise NimbusError(msg)

    init_solver = guess_best_solver(problem) if solver is None else solver
    _solver_options = None if solver_options is None or solver is None else solver_options

    # compute the element-wise difference between each solution (in the decision space)
    solution_1_arr = flatten_variable_dict(problem, solution_1)
    solution_2_arr = flatten_variable_dict(problem, solution_2)
    delta = solution_1_arr - solution_2_arr

    # the '2' is in the denominator because we want to calculate the steps
    # between the two given points; we are not interested in the given points themselves.
    step_size = delta / (2 + num_desired)

    intermediate_points = np.array([solution_2_arr + i * step_size for i in range(1, num_desired + 1)])

    intermediate_var_values = pl.DataFrame(
        [unflatten_variable_array(problem, x) for x in intermediate_points],
        schema=[
            (var.symbol, pl.Float64 if isinstance(var, Variable) else pl.Array(pl.Float64, tuple(var.shape)))
            for var in problem.variables
        ],
    )

    # evaluate the intermediate points to get reference points
    # TODO(gialmisi): an evaluator might have to be selected depending on the problem
    evaluator = PolarsEvaluator(problem)

    reference_points = (
        evaluator.evaluate(intermediate_var_values).select([obj.symbol for obj in problem.objectives]).to_dicts()
    )

    # for each reference point, add and solve the ASF scalarization problem
    # projecting the reference point onto the Pareto optimal front of the problem.
    # TODO(gialmisi): this can be done in parallel.
    intermediate_solutions = []
    for rp in reference_points:
        # add scalarization
        add_asf = add_asf_diff if problem.is_twice_differentiable else add_asf_nondiff
        asf_problem, target = add_asf(problem, "target", rp, **(scalarization_options or {}))

        solver = init_solver(asf_problem, _solver_options)

        # solve and store results
        result: SolverResults = solver.solve(target)

        intermediate_solutions.append(result)

    return intermediate_solutions

solve_sub_problems

solve_sub_problems(
    problem: Problem,
    current_objectives: dict[str, float],
    reference_point: dict[str, float],
    num_desired: int,
    scalarization_options: dict | None = None,
    solver: BaseSolver | None = None,
    solver_options: SolverOptions | None = None,
) -> list[SolverResults]

Solves a desired number of sub-problems as defined in the NIMBUS methods.

Solves 1-4 scalarized problems utilizing different scalarization functions. The scalarizations are based on the classification of a solutions provided by a decision maker. The classifications are represented by a reference point. Returns a number of new solutions corresponding to the number of scalarization functions solved.

Depending on num_desired, solves the following scalarized problems corresponding the the following scalarization functions:

  1. the NIMBUS scalarization function,
  2. the STOM scalarization function,
  3. the achievement scalarizing function, and
  4. the GUESS scalarization function.

Raises:

Type Description
NimbusError

the given problem has an undefined ideal or nadir point, or both.

NimbusError

either the reference point of current objective functions value are missing entries for one or more of the objective functions defined in the problem.

Parameters:

Name Type Description Default
problem Problem

the problem being solved.

required
current_objectives dict[str, float]

an objective dictionary with the objective functions values the classifications have been given with respect to.

required
reference_point dict[str, float]

an objective dictionary with a reference point. The classifications utilized in the sub problems are derived from the reference point.

required
num_desired int

the number of desired solutions to be solved. Solves as many scalarized problems. The value must be in the range 1-4.

required
scalarization_options dict | None

optional kwargs passed to the scalarization function. Defaults to None.

None
solver BaseSolver | None

solver used to solve the problem. If not given, an appropriate solver will be automatically determined based on the features of problem. Defaults to None.

None
solver_options SolverOptions | None

optional options passed to the solver. Ignored if solver is None. Defaults to None.

None

Returns:

Type Description
list[SolverResults]

list[SolverResults]: a list of SolverResults objects. Contains as many elements as defined in num_desired.

Source code in desdeo/mcdm/nimbus.py
def solve_sub_problems(  # noqa: PLR0913
    problem: Problem,
    current_objectives: dict[str, float],
    reference_point: dict[str, float],
    num_desired: int,
    scalarization_options: dict | None = None,
    solver: BaseSolver | None = None,
    solver_options: SolverOptions | None = None,
) -> list[SolverResults]:
    r"""Solves a desired number of sub-problems as defined in the NIMBUS methods.

    Solves 1-4 scalarized problems utilizing different scalarization
    functions. The scalarizations are based on the classification of a
    solutions provided by a decision maker. The classifications
    are represented by a reference point. Returns a number of new solutions
    corresponding to the number of scalarization functions solved.

    Depending on `num_desired`, solves the following scalarized problems corresponding
    the the following scalarization functions:

    1.  the NIMBUS scalarization function,
    2.  the STOM scalarization function,
    3.  the achievement scalarizing function, and
    4.  the GUESS scalarization function.

    Raises:
        NimbusError: the given problem has an undefined ideal or nadir point, or both.
        NimbusError: either the reference point of current objective functions value are
            missing entries for one or more of the objective functions defined in the problem.

    Args:
        problem (Problem): the problem being solved.
        current_objectives (dict[str, float]): an objective dictionary with the objective functions values
            the classifications have been given with respect to.
        reference_point (dict[str, float]): an objective dictionary with a reference point.
            The classifications utilized in the sub problems are derived from
            the reference point.
        num_desired (int): the number of desired solutions to be solved. Solves as
            many scalarized problems. The value must be in the range 1-4.
        scalarization_options (dict | None, optional): optional kwargs passed to the scalarization function.
            Defaults to None.
        solver (BaseSolver | None, optional): solver used to solve the problem.
            If not given, an appropriate solver will be automatically determined based on the features of `problem`.
            Defaults to None.
        solver_options (SolverOptions | None, optional): optional options passed
            to the `solver`. Ignored if `solver` is `None`.
            Defaults to None.

    Returns:
        list[SolverResults]: a list of `SolverResults` objects. Contains as many elements
            as defined in `num_desired`.
    """
    if None in problem.get_ideal_point() or None in problem.get_nadir_point():
        msg = "The given problem must have both an ideal and nadir point defined."
        raise NimbusError(msg)

    if not all(obj.symbol in reference_point for obj in problem.objectives):
        msg = f"The reference point {reference_point} is missing entries " "for one or more of the objective functions."
        raise NimbusError(msg)

    if not all(obj.symbol in current_objectives for obj in problem.objectives):
        msg = f"The current point {reference_point} is missing entries " "for one or more of the objective functions."
        raise NimbusError(msg)

    init_solver = solver if solver is not None else guess_best_solver(problem)
    _solver_options = solver_options if solver_options is not None else None

    # derive the classifications based on the reference point and and previous
    # objective function values
    classifications = infer_classifications(problem, current_objectives, reference_point)

    solutions = []

    # solve the nimbus scalarization problem, this is done always
    add_nimbus_sf = add_nimbus_sf_diff if problem.is_twice_differentiable else add_nimbus_sf_nondiff

    problem_w_nimbus, nimbus_target = add_nimbus_sf(
        problem, "nimbus_sf", classifications, current_objectives, **(scalarization_options or {})
    )

    nimbus_solver = init_solver(problem_w_nimbus, _solver_options) if _solver_options else init_solver(problem_w_nimbus)

    solutions.append(nimbus_solver.solve(nimbus_target))

    if num_desired > 1:
        # solve STOM
        add_stom_sf = add_stom_sf_diff if problem.is_twice_differentiable else add_stom_sf_nondiff

        problem_w_stom, stom_target = add_stom_sf(problem, "stom_sf", reference_point, **(scalarization_options or {}))
        stom_solver = init_solver(problem_w_stom, _solver_options) if _solver_options else init_solver(problem_w_stom)

        solutions.append(stom_solver.solve(stom_target))

    if num_desired > 2:  # noqa: PLR2004
        # solve ASF
        add_asf = add_asf_diff if problem.is_twice_differentiable else add_asf_nondiff

        problem_w_asf, asf_target = add_asf(problem, "asf", reference_point, **(scalarization_options or {}))

        asf_solver = init_solver(problem_w_asf, _solver_options) if _solver_options else init_solver(problem_w_asf)

        solutions.append(asf_solver.solve(asf_target))

    if num_desired > 3:  # noqa: PLR2004
        # solve GUESS
        add_guess_sf = add_guess_sf_diff if problem.is_twice_differentiable else add_guess_sf_nondiff

        problem_w_guess, guess_target = add_guess_sf(
            problem, "guess_sf", reference_point, **(scalarization_options or {})
        )

        if _solver_options:
            guess_solver = init_solver(problem_w_guess, _solver_options)
        else:
            guess_solver = init_solver(problem_w_guess)

        solutions.append(guess_solver.solve(guess_target))

    return solutions

desdeo.mcdm.enautilus

Functions related to the E-NAUTILUS method are defined here.

Reference of the method:

Ruiz, A. B., Sindhya, K., Miettinen, K., Ruiz, F., & Luque, M. (2015). E-NAUTILUS: A decision support system for complex multiobjective optimization problems based on the NAUTILUS method. European Journal of Operational Research, 246(1), 218-231.

ENautilusResult

Bases: BaseModel

The result of an iteration of the E-NAUTILUS method.

Source code in desdeo/mcdm/enautilus.py
class ENautilusResult(BaseModel):
    """The result of an iteration of the E-NAUTILUS method."""

    current_iteration: int = Field(description="Number of the current iteration.")
    iterations_left: int = Field(description="Number of iterations left.")
    intermediate_points: list[dict[str, float]] = Field(description="New intermediate points")
    reachable_best_bounds: list[dict[str, float]] = Field(
        description="Best bounds of the objective function values reachable from each intermediate point."
    )
    reachable_worst_bounds: list[dict[str, float]] = Field(
        description="Worst bounds of the objective function values reachable from each intermediate point."
    )
    closeness_measures: list[float] = Field(description="Closeness measures of each intermediate point.")
    reachable_point_indices: list[list[int]] = Field(
        description="Indices of the reachable points from each intermediate point."
    )

calculate_closeness

calculate_closeness(
    z_intermediate: ndarray,
    z_nadir: ndarray,
    z_representative: ndarray,
) -> float

Calculate the closeness of an intermediate point to the non-dominated set.

The greater the closeness is, the close intermediate point is to the non-dominated set.

Parameters:

Name Type Description Default
z_intermediate ndarray

the intermediate point.

required
z_nadir ndarray

the nadir point of the non-dominated set.

required
z_representative ndarray

the representative solution of z_intermediate.

required

Returns:

Name Type Description
float float

the closeness measure.

Source code in desdeo/mcdm/enautilus.py
def calculate_closeness(z_intermediate: np.ndarray, z_nadir: np.ndarray, z_representative: np.ndarray) -> float:
    """Calculate the closeness of an intermediate point to the non-dominated set.

    The greater the closeness is, the close intermediate point is to the non-dominated set.

    Args:
        z_intermediate (np.ndarray): the intermediate point.
        z_nadir (np.ndarray): the nadir point of the non-dominated set.
        z_representative (np.ndarray): the representative solution of `z_intermediate`.

    Returns:
        float: the closeness measure.
    """
    return np.linalg.norm(z_intermediate - z_nadir) / np.linalg.norm(z_representative - z_nadir) * 100

calculate_intermediate_points

calculate_intermediate_points(
    z_previous: ndarray,
    zs_representatives: ndarray,
    iterations_left: int,
) -> np.ndarray

Calculates the intermediate points to be shown to the decision maker at each iteration.

The number of returned points depends on how many zs_representative points are supplied.

Parameters:

Name Type Description Default
z_previous ndarray

the point selected by the decision maker in the previous iteration.

required
zs_representatives ndarray

the representative solutions at the current iteration.

required
iterations_left int

the number of iterations left (including the current one).

required

Returns:

Type Description
ndarray

np.ndarray: an array of intermediate points.

Source code in desdeo/mcdm/enautilus.py
def calculate_intermediate_points(
    z_previous: np.ndarray, zs_representatives: np.ndarray, iterations_left: int
) -> np.ndarray:
    """Calculates the intermediate points to be shown to the decision maker at each iteration.

    The number of returned points depends on how many `zs_representative points` are supplied.

    Args:
        z_previous (np.ndarray): the point selected by the decision maker in the previous iteration.
        zs_representatives (np.ndarray): the representative solutions at the current iteration.
        iterations_left (int): the number of iterations left (including the current one).

    Returns:
        np.ndarray: an array of intermediate points.
    """
    return ((iterations_left - 1) / iterations_left) * z_previous + (1 / iterations_left) * zs_representatives

calculate_lower_bounds

calculate_lower_bounds(
    non_dominated_points: ndarray, z_intermediate: ndarray
) -> np.ndarray

Calculates the lower bounds of reachable solutions from an intermediate point.

The lower bounds are calculated by solving an epsilon-constraint problem with the epsilon values taken from the intermediate point.

Parameters:

Name Type Description Default
non_dominated_points ndarray

a set of non-dominated points according to which the reachable values are computed.

required
z_intermediate ndarray

the intermediate point according to which the lower bounds are calculated.

required

Returns:

Type Description
ndarray

np.ndarray: the lower bounds of reachable solutions on the non-dominated set based from the intermediate point.

Source code in desdeo/mcdm/enautilus.py
def calculate_lower_bounds(non_dominated_points: np.ndarray, z_intermediate: np.ndarray) -> np.ndarray:
    """Calculates the lower bounds of reachable solutions from an intermediate point.

    The lower bounds are calculated by solving an epsilon-constraint problem
    with the epsilon values taken from the intermediate point.

    Args:
        non_dominated_points (np.ndarray): a set of non-dominated points
            according to which the reachable values are computed.
        z_intermediate (np.ndarray): the intermediate point according to which
            the lower bounds are calculated.

    Returns:
        np.ndarray: the lower bounds of reachable solutions on the non-dominated
            set based from the intermediate point.
    """
    k = non_dominated_points.shape[1]
    bounds = []

    for r in range(k):
        # Indices of objectives other than r
        other = np.delete(np.arange(k), r)

        # Find points that are no worse than z_intermediate in all objectives except r
        mask = np.all(non_dominated_points[:, other] <= z_intermediate[other], axis=1)
        feasible = non_dominated_points[mask]

        if feasible.size > 0:
            bounds.append(np.min(feasible[:, r]))
        else:
            bounds.append(np.inf)  # No feasible point in this projection

    return np.array(bounds)

calculate_reachable_subset

calculate_reachable_subset(
    non_dominated_points: ndarray,
    reachable_indices: ndarray,
    lower_bounds: ndarray,
    z_preferred: ndarray,
) -> list[int]

Calculates the reachable subset on a non-dominated set from a selected intermediate point.

Parameters:

Name Type Description Default
non_dominated_points ndarray

the original set of non-dominated points.

required
reachable_indices ndarray

the currently reachable indices, which will be used to select the new reachable points.

required
lower_bounds ndarray

the lower bounds of the reachable subset of non-dominates points.

required
z_preferred ndarray

the selected intermediate point subject to the reachable subset is calculated.

required

Returns:

Type Description
list[int]

list[int]: the indices of the reachable solutions

Source code in desdeo/mcdm/enautilus.py
def calculate_reachable_subset(
    non_dominated_points: np.ndarray, reachable_indices: np.ndarray, lower_bounds: np.ndarray, z_preferred: np.ndarray
) -> list[int]:
    """Calculates the reachable subset on a non-dominated set from a selected intermediate point.

    Args:
        non_dominated_points (np.ndarray): the original set of non-dominated points.
        reachable_indices (np.ndarray): the currently reachable indices, which
            will be used to select the new reachable points.
        lower_bounds (np.ndarray): the lower bounds of the reachable subset of non-dominates points.
        z_preferred (np.ndarray): the selected intermediate point subject to the reachable subset is calculated.

    Returns:
        list[int]: the indices of the reachable solutions
    """
    return [
        i
        for i, z in enumerate(non_dominated_points)
        if np.all(lower_bounds <= z) and np.all(z <= z_preferred) and i in reachable_indices
    ]

enautilus_get_representative_solutions

enautilus_get_representative_solutions(
    problem: Problem,
    result: ENautilusResult,
    non_dominated_points: DataFrame,
) -> list[SolverResults]

Returns the solution corresponding to the intermediate points.

The representative points are selected based on the current intermediate points. If the number of iterations left is 0, then the intermediate and representative points are equal.

Parameters:

Name Type Description Default
problem Problem

the problem being solved.

required
result ENautilusResult

an ENautilusResponse returned by enautilus_step.

required
non_dominated_points DataFrame

a dataframe from which the representative solutions are taken.

required

Returns:

Name Type Description
SolverResults list[SolverResults]

full information about the solutions. If information other than just objective function values are expected, then the supplied non_dominated_points should contain this information.

Source code in desdeo/mcdm/enautilus.py
def enautilus_get_representative_solutions(
    problem: Problem, result: ENautilusResult, non_dominated_points: pl.DataFrame
) -> list[SolverResults]:
    """Returns the solution corresponding to the intermediate points.

    The representative points are selected based on the current intermediate points.
    If the number of iterations left is 0, then the intermediate and representative points
    are equal.

    Args:
        problem (Problem): the problem being solved.
        result (ENautilusResult): an ENautilusResponse returned by `enautilus_step`.
        non_dominated_points (pl.DataFrame): a dataframe from which the
            representative solutions are taken.

    Returns:
        SolverResults: full information about the solutions. If information
            other than just objective function values are expected, then the
            supplied `non_dominated_points` should contain this information.
    """
    obj_syms = [obj.symbol for obj in problem.objectives]
    const_syms = [con.symbol for con in problem.constraints] if problem.constraints else None
    extra_syms = [extra.symbol for extra in problem.extra_funcs] if problem.extra_funcs else None
    scal_syms = [scal.symbol for scal in problem.scalarization_funcs] if problem.scalarization_funcs else None

    # Build the list of variable column names present in the DataFrame.
    # Scalar variables use their symbol directly; tensor variables are unrolled
    # into columns like "sv_1", "sv_2", ... matching the convention used by
    # solve_epsilon.py and the representative solution set upload.
    available_cols = set(non_dominated_points.columns)
    var_col_names: list[str] = []
    for var in problem.variables:
        if isinstance(var, TensorVariable):
            # Collect unrolled columns: symbol_1, symbol_2, ...
            n_elements = 1
            for d in var.shape:
                n_elements *= d
            for i in range(1, n_elements + 1):
                col = f"{var.symbol}_{i}"
                if col in available_cols:
                    var_col_names.append(col)
        else:
            if var.symbol in available_cols:
                var_col_names.append(var.symbol)

    # Objective matrix (rows = ND points, cols = objectives, original senses)
    obj_matrix = non_dominated_points.select(obj_syms).to_numpy()

    solver_results: list[SolverResults] = []

    for interm in result.intermediate_points:
        interm_vec = np.array([interm[sym] for sym in obj_syms], dtype=float)

        # Find index of closest ND point (Euclidean distance)
        idx = int(np.argmin(np.linalg.norm(obj_matrix - interm_vec, axis=1)))

        row = non_dominated_points[idx]

        var_dict = {col: row[col] for col in var_col_names}
        obj_dict = {sym: row[sym] for sym in obj_syms}
        const_dict = {sym: row[sym] for sym in const_syms if sym in row} if const_syms is not None else None
        extra_dict = {sym: row[sym] for sym in extra_syms if sym in row} if extra_syms is not None else None
        scal_dict = {sym: row[sym] for sym in scal_syms if sym in row} if scal_syms is not None else None

        solver_results.append(
            SolverResults(
                optimal_variables=var_dict,
                optimal_objectives=obj_dict,
                constraint_values=const_dict,
                extra_func_values=extra_dict,
                scalarization_values=scal_dict,
                success=True,
                message="E-NAUTILUS: nearest non-dominated point selected for intermediate point.",
            )
        )

    return solver_results

enautilus_step

enautilus_step(
    problem: Problem,
    non_dominated_points: DataFrame | dict[str, float],
    current_iteration: int,
    iterations_left: int,
    selected_point: dict[str, float],
    reachable_point_indices: list[int],
    number_of_intermediate_points: int,
) -> ENautilusResult

Compute one iteration of the E-NAUTILUS method.

It is assumed that information from a previous iteration (selected point, etc.) is available either from a previous iteration of E-NAUTILUS, or if this is the first iteration, then the selected (intermediate) point selected_point should be the approximated nadir point from non_dominated_points. In this case, the reachable_point_indices should cover the whole of non_dominated_points. After the first iteration, all the information for computing the next iteration is always available from the previous iteration's result of this function (plus the selected_point provided by e.g., a decision maker).

Parameters:

Name Type Description Default
problem Problem

the problem being solved. Used mainly for manipulating the other arguments.

required
non_dominated_points DataFrame

a set of non-dominated points approximating the Pareto front of Problem. This should be a Polars dataframe with at least columns that match the objective function symbols in Problem and the corresponding minimization value column. I.e., for an objective with symbol 'f1' the dataframe should have the symbols 'f1' and 'f1_min', where the column 'f1_min has the corresponding values of 'f1', but assuming minimization (N.B. if 'f1' is minimized, then 'f1_min' would have identical values as 'f1'). If provided as a dict, this will be converted to a polars dataframe.

required
current_iteration int

the number of the current iteration. For the first iteration, this should be zero.

required
iterations_left int

how many iteration are left (counting the current one).

required
selected_point dict[str, float]

the selected intermediate point in the previous iteration. If this is the first iteration, then this should be the nadir point approximated from non_dominated_points.

required
reachable_point_indices list[int]

the indices of the points in non_dominated_points that are reachable from current_iteration_point.

required
number_of_intermediate_points int

how many intermediate points are generated.

required

Returns:

Name Type Description
ENautilusResult ENautilusResult

the result of the iteration.

Source code in desdeo/mcdm/enautilus.py
def enautilus_step(  # noqa: PLR0913
    problem: Problem,
    non_dominated_points: pl.DataFrame | dict[str, float],
    current_iteration: int,
    iterations_left: int,
    selected_point: dict[str, float],
    reachable_point_indices: list[int],
    number_of_intermediate_points: int,
) -> ENautilusResult:
    """Compute one iteration of the E-NAUTILUS method.

    It is assumed that information from a previous iteration (selected point,
    etc.) is available either from a previous iteration of E-NAUTILUS, or if
    this is the first iteration, then the selected (intermediate) point
    `selected_point` should be the approximated nadir point from
    `non_dominated_points`. In this case, the `reachable_point_indices` should
    cover the whole of `non_dominated_points`. After the first iteration, all
    the information for computing the next iteration is always available from
    the previous iteration's result of this function (plus the `selected_point`
    provided by e.g., a decision maker).

    Args:
        problem (Problem): the problem being solved. Used mainly for manipulating the other arguments.
        non_dominated_points (pl.DataFrame): a set of non-dominated points
            approximating the Pareto front of `Problem`. This should be a Polars
            dataframe with at least columns that match the objective function
            symbols in `Problem` and the corresponding minimization value column.
            I.e., for an objective with symbol 'f1' the dataframe should have the
            symbols 'f1' and 'f1_min', where the column 'f1_min has the
            corresponding values of 'f1', but assuming minimization (N.B. if 'f1' is
            minimized, then 'f1_min' would have identical values as 'f1'). If provided
            as a `dict`, this will be converted to a polars dataframe.
        current_iteration (int): the number of the current iteration. For the first iteration, this should be zero.
        iterations_left (int): how many iteration are left (counting the current one).
        selected_point (dict[str, float]): the selected intermediate point in
            the previous iteration. If this is the first iteration, then this should
            be the nadir point approximated from `non_dominated_points`.
        reachable_point_indices (list[int]): the indices of the points in
            `non_dominated_points` that are reachable from
            `current_iteration_point`.
        number_of_intermediate_points (int): how many intermediate points are generated.

    Returns:
        ENautilusResult: the result of the iteration.
    """
    # treat everything as minimized
    # selected point as numpy array, correct for minimization
    z_h = objective_dict_to_numpy_array(problem, flip_maximized_objective_values(problem, selected_point))

    # subset of reachable solutions, take _min column
    if isinstance(non_dominated_points, dict):
        # dict converted to Polars dataframe
        _non_dominated_points = pl.DataFrame(non_dominated_points)
    else:
        # already Polars dataframe
        _non_dominated_points = non_dominated_points

    non_dom_objectives = _non_dominated_points[[f"{obj.symbol}_min" for obj in problem.objectives]].to_numpy()
    p_h = non_dom_objectives[reachable_point_indices]

    # estimate nadir from non-dominated points, treating as minimized problem
    z_nadir = non_dom_objectives.max(axis=0)

    # compute representative points
    representative_points = prune_by_average_linkage(p_h, number_of_intermediate_points)

    # calculate intermediate points
    intermediate_points = calculate_intermediate_points(z_h, representative_points, iterations_left)

    # calculate lower bounds
    intermediate_lower_bounds = [
        calculate_lower_bounds(p_h, intermediate_point) for intermediate_point in intermediate_points
    ]

    # calculate closeness measures
    closeness_measures = [
        calculate_closeness(intermediate_point, z_nadir, representative_point)
        for (intermediate_point, representative_point) in zip(intermediate_points, representative_points, strict=True)
    ]

    # calculate the indices of the reachable points for each intermediate point
    reachable_from_intermediate = [
        calculate_reachable_subset(non_dom_objectives, reachable_point_indices, lower_bounds, interm)
        for lower_bounds, interm in zip(intermediate_lower_bounds, intermediate_points, strict=True)
    ]

    best_bounds = [
        flip_maximized_objective_values(problem, numpy_array_to_objective_dict(problem, bounds))
        for bounds in intermediate_lower_bounds
    ]
    worst_bounds = [
        flip_maximized_objective_values(problem, numpy_array_to_objective_dict(problem, point))
        for point in intermediate_points
    ]

    corrected_intermediate_points = [
        flip_maximized_objective_values(problem, numpy_array_to_objective_dict(problem, point))
        for point in intermediate_points
    ]

    return ENautilusResult(
        current_iteration=current_iteration + 1,
        iterations_left=iterations_left - 1,
        intermediate_points=corrected_intermediate_points,
        reachable_best_bounds=best_bounds,
        reachable_worst_bounds=worst_bounds,
        closeness_measures=closeness_measures,
        reachable_point_indices=reachable_from_intermediate,
    )

prune_by_average_linkage

prune_by_average_linkage(
    non_dominated_points: ndarray, k: int
) -> np.ndarray

Prune a set of non-dominated points using average linkage clustering (Morse, 1980).

This is used to calculate the representative solutions in E-NAUTILUS.

Parameters:

Name Type Description Default
non_dominated_points ndarray

an array of non-dominated points in objective space.

required
k int

Number of representative points to retain.

required

Returns:

Type Description
ndarray

np.ndarray: an array of representative points.

Source code in desdeo/mcdm/enautilus.py
def prune_by_average_linkage(non_dominated_points: np.ndarray, k: int) -> np.ndarray:
    """Prune a set of non-dominated points using average linkage clustering (Morse, 1980).

    This is used to calculate the representative solutions in E-NAUTILUS.

    Args:
        non_dominated_points (np.ndarray): an array of non-dominated points in objective space.
        k (int): Number of representative points to retain.

    Returns:
        np.ndarray: an array of representative points.
    """
    if len(non_dominated_points) <= k:
        # no need to prune
        return non_dominated_points

    # Compute pairwise distances
    distances = pdist(non_dominated_points, metric="euclidean")

    # Hierarchical clustering using average linkage
    z = linkage(distances, method="average")

    # Cut tree to form k clusters
    cluster_labels = fcluster(z, k, criterion="maxclust")

    # For each cluster, choose the point closest to the centroid
    representatives = []
    for cluster_id in range(1, k + 1):
        cluster_points = non_dominated_points[cluster_labels == cluster_id]
        centroid = cluster_points.mean(axis=0)
        closest_idx = np.argmin(np.linalg.norm(cluster_points - centroid, axis=1))
        representatives.append(cluster_points[closest_idx])

    return np.array(representatives)

desdeo.mcdm.nautili

Methods for the NAUTILI (a group decision making variant for NAUTILUS) method.

NAUTILI_Response

Bases: BaseModel

The response of the NAUTILI method.

Source code in desdeo/mcdm/nautili.py
class NAUTILI_Response(BaseModel):
    """The response of the NAUTILI method."""

    step_number: int = Field(description="The step number associted with this response.")
    distance_to_front: float = Field(
        description=(
            "The distance travelled to the Pareto front. "
            "The distance is a ratio of the distances between the nadir and navigation point, and "
            "the nadir and the reachable objective vector. The distance is given in percentage."
        )
    )
    reference_points: dict | None = Field(description="The reference points used in the step.")
    improvement_directions: dict | None = Field(description="The improvement directions for each DM.")
    group_improvement_direction: dict | None = Field(description="The group improvement direction.")
    navigation_point: dict = Field(description="The navigation point used in the step.")
    reachable_solution: dict | None = Field(description="The reachable solution found in the step.")
    reachable_bounds: dict = Field(description="The reachable bounds found in the step.")

NautiliError

Bases: Exception

Exception raised for errors in the NAUTILI method.

Source code in desdeo/mcdm/nautili.py
class NautiliError(Exception):
    """Exception raised for errors in the NAUTILI method."""

nautili_init

nautili_init(
    problem: Problem, solver: BaseSolver | None = None
) -> NAUTILI_Response

Initializes the NAUTILI method.

Creates the initial response of the method, which sets the navigation point to the nadir point and the reachable bounds to the ideal and nadir points.

Parameters:

Name Type Description Default
problem Problem

The problem to be solved.

required
solver BaseSolver | None

The solver to use. Defaults to None.

None

Returns:

Name Type Description
NAUTILUS_Response NAUTILI_Response

The initial response of the method.

Source code in desdeo/mcdm/nautili.py
def nautili_init(problem: Problem, solver: BaseSolver | None = None) -> NAUTILI_Response:
    """Initializes the NAUTILI method.

    Creates the initial response of the method, which sets the navigation point to the nadir point
    and the reachable bounds to the ideal and nadir points.

    Args:
        problem (Problem): The problem to be solved.
        solver (BaseSolver | None, optional): The solver to use. Defaults to None.

    Returns:
        NAUTILUS_Response: The initial response of the method.
    """
    nav_point = get_nadir_dict(problem)
    lower_bounds, upper_bounds = solve_reachable_bounds(problem, nav_point, solver=solver)
    return NAUTILI_Response(
        distance_to_front=0,
        navigation_point=nav_point,
        reachable_bounds={"lower_bounds": lower_bounds, "upper_bounds": upper_bounds},
        reachable_solution=None,
        reference_points=None,
        improvement_directions=None,
        group_improvement_direction=None,
        step_number=0,
    )

solve_reachable_bounds

solve_reachable_bounds(
    problem: Problem,
    navigation_point: dict[str, float],
    solver: BaseSolver | None = None,
) -> tuple[dict[str, float], dict[str, float]]

Computes the current reachable (upper and lower) bounds of the solutions in the objective space.

The reachable bound are computed based on the current navigation point. The bounds are computed by solving an epsilon constraint problem.

Parameters:

Name Type Description Default
problem Problem

the problem being solved.

required
navigation_point dict[str, float]

the navigation point limiting the reachable area. The key is the objective function's symbol and the value the navigation point.

required
solver BaseSolver | None

solver based on BaseSolver used to solve the problem. If None, then a solver is utilized bases on the problem's properties. Defaults to None.

None

Raises:

Type Description
NautiliError

when optimization of an epsilon constraint problem is not successful.

Returns:

Type Description
tuple[dict[str, float], dict[str, float]]

tuple[dict[str, float], dict[str, float]]: a tuple of dicts, where the first dict are the lower bounds and the second element the upper bounds, the key is the symbol of each objective.

Source code in desdeo/mcdm/nautili.py
def solve_reachable_bounds(
    problem: Problem,
    navigation_point: dict[str, float],
    solver: BaseSolver | None = None
) -> tuple[dict[str, float], dict[str, float]]:
    """Computes the current reachable (upper and lower) bounds of the solutions in the objective space.

    The reachable bound are computed based on the current navigation point. The bounds are computed by
    solving an epsilon constraint problem.

    Args:
        problem (Problem): the problem being solved.
        navigation_point (dict[str, float]): the navigation point limiting the
            reachable area. The key is the objective function's symbol and the value
            the navigation point.
        solver (BaseSolver | None, optional): solver based on BaseSolver used to solve the problem.
            If None, then a solver is utilized bases on the problem's properties. Defaults to None.

    Raises:
        NautiliError: when optimization of an epsilon constraint problem is not successful.

    Returns:
        tuple[dict[str, float], dict[str, float]]: a tuple of dicts, where the first dict are the lower bounds and the
            second element the upper bounds, the key is the symbol of each objective.
    """
    # If an objective is to be maximized, then the navigation point component of that objective should be
    # multiplied by -1.
    const_bounds = {
        objective.symbol: -1 * navigation_point[objective.symbol]
        if objective.maximize
        else navigation_point[objective.symbol]
        for objective in problem.objectives
    }

    # if a solver creator was provided, use that, else, guess the best one
    solver_init = guess_best_solver(problem) if solver is None else solver

    lower_bounds = {}
    upper_bounds = {}
    for objective in problem.objectives:
        # Lower bounds
        eps_problem, target, _ = add_epsilon_constraints(
            problem,
            "target",
            {f"{obj.symbol}": f"{obj.symbol}_eps" for obj in problem.objectives},
            objective.symbol,
            const_bounds,
        )

        # solve
        solver = solver_init(eps_problem)
        res = solver.solve(target)

        if not res.success:
            # could not optimize eps problem
            msg = (
                f"Optimizing the epsilon constrait problem for the objective "
                f"{objective.symbol} was not successful. Reason: {res.message}"
            )
            raise NautiliError(msg)

        lower_bound = res.optimal_objectives[objective.symbol]

        if isinstance(lower_bound, list):
            lower_bound = lower_bound[0]

        # solver upper bounds
        # the lower bounds is set as in the NAUTILUS method, e.g., taken from
        # the current iteration/navigation point
        if isinstance(navigation_point[objective.symbol], list):
            # It should never be a list accordint to the type hints
            upper_bound = navigation_point[objective.symbol][0]
        else:
            upper_bound = navigation_point[objective.symbol]

        # add the lower and upper bounds logically depending whether an objective is to be maximized or minimized
        lower_bounds[objective.symbol] = lower_bound if not objective.maximize else upper_bound
        upper_bounds[objective.symbol] = upper_bound if not objective.maximize else lower_bound

    return lower_bounds, upper_bounds

solve_reachable_solution

solve_reachable_solution(
    problem: Problem,
    group_improvement_direction: dict[str, float],
    previous_nav_point: dict[str, float],
    solver: BaseSolver | None = None,
    solver_options: SolverOptions | None = None,
) -> SolverResults

Calculates the reachable solution on the Pareto optimal front.

For the calculation to make sense in the context of NAUTILI, the reference point should be bounded by the reachable bounds present at the navigation step the reference point has been given.

In practice, the reachable solution is calculated by solving an achievement scalarizing function.

Parameters:

Name Type Description Default
problem Problem

the problem being solved.

required
group_improvement_direction dict[str, float]

the improvement direction for the group.

required
previous_nav_point dict[str, float]

the previous navigation point. The reachable solution found is always better than the previous navigation point.

required
solver BaseSolver | None

solver based on BaseSolver used to solve the problem. If None, then a solver is utilized bases on the problem's properties. Defaults to None.

None
solver_options SolverOptions | None

optional options passed to the solver. Ignored if solver is None. Defaults to None.

None

Returns: SolverResults: the results of the projection.

Source code in desdeo/mcdm/nautili.py
def solve_reachable_solution(
    problem: Problem,
    group_improvement_direction: dict[str, float],
    previous_nav_point: dict[str, float],
    solver: BaseSolver | None = None,
    solver_options: SolverOptions | None = None,
) -> SolverResults:
    """Calculates the reachable solution on the Pareto optimal front.

    For the calculation to make sense in the context of NAUTILI, the reference point
    should be bounded by the reachable bounds present at the navigation step the
    reference point has been given.

    In practice, the reachable solution is calculated by solving an achievement
    scalarizing function.

    Args:
        problem (Problem): the problem being solved.
        group_improvement_direction (dict[str, float]): the improvement direction for the group.
        previous_nav_point (dict[str, float]): the previous navigation point. The reachable solution found
            is always better than the previous navigation point.
        solver (BaseSolver | None, optional): solver based on BaseSolver used to solve the problem.
            If None, then a solver is utilized bases on the problem's properties. Defaults to None.
        solver_options (SolverOptions | None, optional): optional options passed
            to the `solver`. Ignored if `solver` is `None`.
            Defaults to None.
    Returns:
        SolverResults: the results of the projection.
    """
    # check solver
    init_solver = guess_best_solver(problem) if solver is None else solver

    # create and add scalarization function
    # previous_nav_point = objective_dict_to_numpy_array(problem, previous_nav_point).tolist()
    # weights = objective_dict_to_numpy_array(problem, group_improvement_direction).tolist()
    if problem.is_twice_differentiable:
        problem_w_asf, target = add_asf_generic_diff(
            problem,
            symbol="asf",
            reference_point=previous_nav_point,
            weights=group_improvement_direction,
            reference_point_aug=previous_nav_point,
        )
    else:
        problem_w_asf, target = add_asf_generic_nondiff(
            problem,
            symbol="asf",
            reference_point=previous_nav_point,
            weights=group_improvement_direction,
            reference_point_aug=previous_nav_point,
        )

    # Note: We do not solve the global problem. Instead, we solve this constrained problem:
    problem_w_asf = problem_w_asf.add_constraints(
        [
            Constraint(
                name=f"_const_{i+1}",
                symbol=f"_const_{i+1}",
                func=f"{obj.symbol}_min - {previous_nav_point[obj.symbol] * (-1 if obj.maximize else 1)}",
                cons_type=ConstraintTypeEnum.LTE,
                is_linear=obj.is_linear,
                is_convex=obj.is_convex,
                is_twice_differentiable=obj.is_twice_differentiable,
            )
            for i, obj in enumerate(problem_w_asf.objectives)
        ]
    )

    # solve the problem
    solver = init_solver(problem_w_asf)
    return solver.solve(target)

desdeo.mcdm.nautilus

Functions related to the NAUTILUS 1/2 method are defined here.

Reference of the method:

TODO: update

NAUTILUS_Response

Bases: BaseModel

The response of the NAUTILUS method.

Source code in desdeo/mcdm/nautilus.py
class NAUTILUS_Response(BaseModel):  # NOQA: N801
    """The response of the NAUTILUS method."""

    step_number: int = Field(description="The step number associted with this response.")
    distance_to_front: float = Field(
        description=(
            "The distance travelled to the Pareto front. "
            "The distance is a ratio of the distances between the nadir and navigation point, and "
            "the nadir and the reachable objective vector. The distance is given in percentage."
        )
    )
    preference: dict | None = Field(
        description="The preference used in the step. For now assumed that it is a reference point"
    )
    # preference_method: dict | None = Field(description="The preference method used in the step.")
    # improvement_direction: dict | None = Field(description="The improvement direction.")
    navigation_point: dict = Field(description="The navigation point used in the step.")
    reachable_solution: dict | None = Field(description="The reachable solution found in the step.")
    reachable_bounds: dict = Field(description="The reachable bounds found in the step.")

NautilusError

Bases: Exception

Raised when an exception is encountered with procedures related to NAUTILUS.

Source code in desdeo/mcdm/nautilus.py
class NautilusError(Exception):
    """Raised when an exception is encountered with procedures related to NAUTILUS."""

__nautilus_all_steps

__nautilus_all_steps(
    problem: Problem,
    steps_remaining: int,
    preference: dict,
    previous_responses: list[NAUTILUS_Response],
    solver: BaseSolver | None = None,
) -> list[NAUTILUS_Response]

Performs all steps of the NAUTILUS method.

NAUTILUS needs to be initialized before calling this function. Once initialized, this function performs all steps of the method. However, this method need not start from the beginning. The method conducts "steps_remaining" number of steps from the last navigation point. The last navigation point is taken from the last response in "previous_responses" list. The first step in this algorithm always involves recalculating the reachable solution. All subsequest steps are precalculated without recalculating the reachable solution, with the assumption that the reference point has not changed. It is up to the user to only show the steps that the DM thinks they have taken.

Parameters:

Name Type Description Default
problem Problem

The problem to be solved.

required
steps_remaining int

The number of steps remaining.

required
preference dict

The points provided by the DM for defining preference.

required
previous_responses list[NAUTILUS_Response]

The previous responses of the method.

required
solver BaseSolver | None

The solver to use. Defaults to None, in which case the algorithm will guess the best solver for the problem.

None

Returns:

Type Description
list[NAUTILUS_Response]

list[NAUTILUS_Response]: The new responses of the method after all steps. Note, as only new responses are returned, the length of the list is equal to "steps_remaining". The analyst should append these responses to the "previous_responses" list to keep track of the entire process.

Source code in desdeo/mcdm/nautilus.py
def __nautilus_all_steps(
    problem: Problem,
    steps_remaining: int,
    preference: dict,
    previous_responses: list[NAUTILUS_Response],
    solver: BaseSolver | None = None,
) -> list[NAUTILUS_Response]:
    """Performs all steps of the NAUTILUS method.

    NAUTILUS needs to be initialized before calling this function. Once initialized, this function performs all
    steps of the method. However, this method need not start from the beginning. The method conducts "steps_remaining"
    number of steps from the last navigation point. The last navigation point is taken from the last response in
    "previous_responses" list. The first step in this algorithm always involves recalculating the reachable solution.
    All subsequest steps are precalculated without recalculating the reachable solution, with the assumption that the
    reference point has not changed. It is up to the user to only show the steps that the DM thinks they have taken.

    Args:
        problem (Problem): The problem to be solved.
        steps_remaining (int): The number of steps remaining.
        preference (dict): The points provided by the DM for defining preference.
        previous_responses (list[NAUTILUS_Response]): The previous responses of the method.
        solver (BaseSolver | None, optional): The solver to use. Defaults to None, in which case the
            algorithm will guess the best solver for the problem.

    Returns:
        list[NAUTILUS_Response]: The new responses of the method after all steps. Note, as only new responses are
            returned, the length of the list is equal to "steps_remaining". The analyst should append these responses
            to the "previous_responses" list to keep track of the entire process.
    """
    responses: list[NAUTILUS_Response] = []
    nav_point = previous_responses[-1].navigation_point
    step_number = previous_responses[-1].step_number + 1
    first_iteration = True
    reachable_solution = dict
    while steps_remaining > 0:
        if first_iteration:
            response = nautilus_step(
                problem,
                steps_remaining=steps_remaining,
                step_number=step_number,
                nav_point=nav_point,
                points=preference,
                solver=solver,
            )
            first_iteration = False
        else:
            response = nautilus_step(
                problem,
                steps_remaining=steps_remaining,
                step_number=step_number,
                nav_point=nav_point,
                reachable_solution=reachable_solution,
                solver=solver,
            )
        response.preference = preference
        responses.append(response)
        reachable_solution = response.reachable_solution
        nav_point = response.navigation_point
        steps_remaining -= 1
        step_number += 1
    return responses

calculate_preferential_factors

calculate_preferential_factors()

TODO: implement.

Source code in desdeo/mcdm/nautilus.py
def calculate_preferential_factors():
    """TODO: implement."""

get_current_path

get_current_path(
    all_responses: list[NAUTILUS_Response],
) -> list[int]

Get the path of the current responses.

All responses may contain steps that the DM has gone back on. This function returns the path of the current active path being followed by the DM. The path is a list of indices of the responses in the "all_responses" list. Note that the path includes all steps until reaching the Pareto front (or whatever the last response is). It is up to the analyst/GUI to only show the steps that the DM has taken.

Parameters:

Name Type Description Default
all_responses list[NAUTILUS_Response]

All responses returned by the NAUTILUS method.

required

Returns:

Type Description
list[int]

list[int]: The path of the current active responses.

Source code in desdeo/mcdm/nautilus.py
def get_current_path(all_responses: list[NAUTILUS_Response]) -> list[int]:
    """Get the path of the current responses.

    All responses may contain steps that the DM has gone back on. This function returns the path of the current active
    path being followed by the DM. The path is a list of indices of the responses in the "all_responses" list. Note that
    the path includes all steps until reaching the Pareto front (or whatever the last response is). It is up to the
    analyst/GUI to only show the steps that the DM has taken.

    Args:
        all_responses (list[NAUTILUS_Response]): All responses returned by the NAUTILUS method.

    Returns:
        list[int]: The path of the current active responses.
    """
    total_steps = all_responses[-1].step_number
    current_index = len(all_responses) - 1
    path: list[int] = [current_index]
    total_steps -= 1

    while total_steps >= 0:
        found_step = False
        while not found_step:
            current_index -= 1
            if all_responses[current_index].step_number == total_steps:
                path.append(current_index)
                found_step = True
        total_steps -= 1
    return list(reversed(path))

nautilus_init

nautilus_init(
    problem: Problem, solver: BaseSolver | None = None
) -> NAUTILUS_Response

Initializes the NAUTILUS method.

Creates the initial response of the method, which sets the navigation point to the nadir point and the reachable bounds to the ideal and nadir points.

Parameters:

Name Type Description Default
problem Problem

The problem to be solved.

required
solver BaseSolver | None

The solver to use. Defaults to None.

None

Returns:

Name Type Description
NAUTILUS_Response NAUTILUS_Response

The initial response of the method.

Source code in desdeo/mcdm/nautilus.py
def nautilus_init(problem: Problem, solver: BaseSolver | None = None) -> NAUTILUS_Response:
    """Initializes the NAUTILUS method.

    Creates the initial response of the method, which sets the navigation point to the nadir point
    and the reachable bounds to the ideal and nadir points.

    Args:
        problem (Problem): The problem to be solved.
        solver (BaseSolver | None, optional): The solver to use. Defaults to None.

    Returns:
        NAUTILUS_Response: The initial response of the method.
    """
    nav_point = get_nadir_dict(problem)
    lower_bounds, upper_bounds = solve_reachable_bounds(problem, nav_point, solver=solver)
    return NAUTILUS_Response(
        distance_to_front=0,
        navigation_point=nav_point,
        reachable_bounds={"lower_bounds": lower_bounds, "upper_bounds": upper_bounds},
        reachable_solution=None,
        preference=None,
        step_number=0,
    )

nautilus_step

nautilus_step(
    problem: Problem,
    steps_remaining: int,
    step_number: int,
    nav_point: dict,
    solver: BaseSolver | None = None,
    points: dict[str, float] | None = None,
    ranks: dict[str, int] | None = None,
) -> NAUTILUS_Response

Performs a step of the NAUTILUS method.

Parameters:

Name Type Description Default
problem Problem

The problem to be solved.

required
steps_remaining int

The number of steps remaining.

required
step_number int

The current step number. Just used for the response.

required
nav_point dict

The current navigation point.

required
solver BaseSolver | None

The solver to use. Defaults to None.

None
points dict[str, float] | None

The points of the objectives. Defaults to None.

None
ranks dict[str, int] | None

The ranks of the objectives. Defaults to None.

None

Raises:

Type Description
NautilusError

If neither preference nor reachable_solution is provided.

NautilusError

If both preference and reachable_solution are provided.

Returns:

Name Type Description
NAUTILUS_Response NAUTILUS_Response

The response of the method after the step.

Source code in desdeo/mcdm/nautilus.py
def nautilus_step(
    problem: Problem,
    steps_remaining: int,
    step_number: int,
    nav_point: dict,
    solver: BaseSolver | None = None,
    points: dict[str, float] | None = None,
    ranks: dict[str, int] | None = None,
) -> NAUTILUS_Response:
    """Performs a step of the NAUTILUS method.

    Args:
        problem (Problem): The problem to be solved.
        steps_remaining (int): The number of steps remaining.
        step_number (int): The current step number. Just used for the response.
        nav_point (dict): The current navigation point.
        solver (BaseSolver | None, optional): The solver to use. Defaults to None.
        points (dict[str, float] | None, optional): The points of the objectives. Defaults to None.
        ranks (dict[str, int] | None, optional): The ranks of the objectives. Defaults to None.

    Raises:
        NautilusError: If neither preference nor reachable_solution is provided.
        NautilusError: If both preference and reachable_solution are provided.

    Returns:
        NAUTILUS_Response: The response of the method after the step.
    """
    if points is None and ranks is None:
        raise NautilusError("Either points or ranks must be provided.")
    if points is not None and ranks is not None:
        raise NautilusError("Both points and ranks cannot be provided.")

    # get weights
    if points is not None:  # noqa: SIM108
        weights = points_to_weights(points, problem)
    else:
        weights = ranks_to_weights(ranks, problem)

    # calculate reachable solution (direction).
    # This is inefficient as it is recalculated even if preferences do not change.
    opt_result = solve_reachable_solution(problem, weights, nav_point, solver)

    if not opt_result.success:
        warn(message="The solver did not converge.", stacklevel=2)

    reachable_point = opt_result.optimal_objectives

    # update nav point
    new_nav_point = calculate_navigation_point(problem, nav_point, reachable_point, steps_remaining)

    # update_bounds
    lower_bounds, upper_bounds = solve_reachable_bounds(problem, new_nav_point, solver)

    distance = calculate_distance_to_front(problem, new_nav_point, reachable_point)

    return NAUTILUS_Response(
        step_number=step_number,
        distance_to_front=distance,
        navigation_point=new_nav_point,
        reachable_solution=reachable_point,
        preference=ranks if ranks is not None else points,
        reachable_bounds={"lower_bounds": lower_bounds, "upper_bounds": upper_bounds},
    )

points_to_weights

points_to_weights(
    points: dict[str, float], problem: Problem
) -> dict[str, float]

Convert points to weights.

The points are converted to weights using the following formula: weight = point * (nadir - utopian).

Parameters:

Name Type Description Default
points dict[str, float]

The points of the objectives.

required
problem Problem

The problem being solved.

required

Returns:

Type Description
dict[str, float]

dict[str, float]: The weights calculated from the points.

Source code in desdeo/mcdm/nautilus.py
def points_to_weights(points: dict[str, float], problem: Problem) -> dict[str, float]:
    """Convert points to weights.

    The points are converted to weights using the following formula:
    weight = point * (nadir - utopian).

    Args:
        points (dict[str, float]): The points of the objectives.
        problem (Problem): The problem being solved.

    Returns:
        dict[str, float]: The weights calculated from the points.
    """
    nadir = get_nadir_dict(problem)
    ideal = get_ideal_dict(problem)
    tol = 1e-6
    weights = {}
    check_sum = 0
    for key, point in points.items():
        max_mult = next(obj.maximize for obj in problem.objectives if obj.symbol == key)
        max_mult = -1 if max_mult else 1
        weights[key] = point / 100 * (nadir[key] * max_mult - ideal[key] * max_mult + tol)
        check_sum += point
    if check_sum != 100:  # noqa: PLR2004
        raise ValueError(f"The sum of the points must be 100. The sum is {check_sum}.")
    return weights

ranks_to_weights

ranks_to_weights(
    ranks: dict[str, int], problem: Problem
) -> dict[str, float]

Convert ranks to weights.

The ranks are converted to weights using the following formula: weight = rank * (nadir - utopian). Note that this means that a lower rank is worse.

Parameters:

Name Type Description Default
ranks dict[str, int]

The ranks of the objectives.

required
problem Problem

The problem being solved.

required

Returns:

Type Description
dict[str, float]

dict[str, float]: The weights calculated from the ranks.

Source code in desdeo/mcdm/nautilus.py
def ranks_to_weights(ranks: dict[str, int], problem: Problem) -> dict[str, float]:
    """Convert ranks to weights.

    The ranks are converted to weights using the following formula:
    weight = rank * (nadir - utopian). Note that this means that a lower rank is worse.

    Args:
        ranks (dict[str, int]): The ranks of the objectives.
        problem (Problem): The problem being solved.

    Returns:
        dict[str, float]: The weights calculated from the ranks.
    """
    nadir = get_nadir_dict(problem)
    ideal = get_ideal_dict(problem)
    tol = 1e-10
    weights = {}
    for key, rank in ranks.items():
        max_mult = next(obj.maximize for obj in problem.objectives if obj.symbol == key)
        max_mult = -1 if max_mult else 1
        weights[key] = rank * (nadir[key] * max_mult - ideal[key] * max_mult + tol)
    return weights

solve_reachable_solution

solve_reachable_solution(
    problem: Problem,
    weights: dict[str, float],
    previous_nav_point: dict[str, float],
    solver: BaseSolver | None = None,
) -> SolverResults

Calculates the reachable solution on the Pareto optimal front.

For the calculation to make sense in the context of NAUTILUS, the reference point should be bounded by the reachable bounds present at the navigation step the reference point has been given.

In practice, the reachable solution is calculated by solving an achievement scalarizing function.

Parameters:

Name Type Description Default
problem Problem

the problem being solved.

required
weights dict[str, float]

the weights defining the direction of improvement. Must be calculated from the preference provided by the DM (weights, ranks, or reference point).

required
previous_nav_point dict[str, float]

the previous navigation point. The reachable solution found is always better than the previous navigation point.

required
solver BaseSolver | None

solver to solve the problem. If None, then a solver is utilized bases on the problem's properties. Defaults to None.

None

Returns:

Name Type Description
SolverResults SolverResults

the results of the projection.

Source code in desdeo/mcdm/nautilus.py
def solve_reachable_solution(
    problem: Problem,
    weights: dict[str, float],
    # improvement_direction: dict[str, float],
    previous_nav_point: dict[str, float],
    solver: BaseSolver | None = None,
) -> SolverResults:
    """Calculates the reachable solution on the Pareto optimal front.

    For the calculation to make sense in the context of NAUTILUS, the reference point
    should be bounded by the reachable bounds present at the navigation step the
    reference point has been given.

    In practice, the reachable solution is calculated by solving an achievement
    scalarizing function.

    Args:
        problem (Problem): the problem being solved.
        weights (dict[str, float]): the weights defining the direction of improvement. Must be calculated
            from the preference provided by the DM (weights, ranks, or reference point).
        previous_nav_point (dict[str, float]): the previous navigation point. The reachable solution found
            is always better than the previous navigation point.
        solver (BaseSolver | None, optional): solver to solve the problem.
            If None, then a solver is utilized bases on the problem's properties. Defaults to None.

    Returns:
        SolverResults: the results of the projection.
    """
    # check solver
    init_solver = guess_best_solver(problem) if solver is None else solver

    # need to convert the preferences to preferential factors?

    # create and add scalarization function
    if problem.is_twice_differentiable:
        # differentiable problem
        problem_w_asf, target = add_asf_generic_diff(
            problem,
            symbol="asf",
            reference_point=previous_nav_point,
            weights=weights,
            reference_point_aug=previous_nav_point,
        )
    else:
        # non-differentiable problem
        problem_w_asf, target = add_asf_generic_nondiff(
            problem,
            symbol="asf",
            reference_point=previous_nav_point,
            weights=weights,
            reference_point_aug=previous_nav_point,
        )

    # Note: We do not solve the global problem. Instead, we solve this constrained problem:
    problem_w_asf = problem_w_asf.add_constraints(
        [
            Constraint(
                name=f"_const_{i + 1}",
                symbol=f"_const_{i + 1}",
                func=f"{obj.symbol}_min - {previous_nav_point[obj.symbol] * (-1 if obj.maximize else 1)}",
                cons_type=ConstraintTypeEnum.LTE,
                is_linear=obj.is_linear,
                is_convex=obj.is_convex,
                is_twice_differentiable=obj.is_twice_differentiable,
            )
            for i, obj in enumerate(problem_w_asf.objectives)
        ]
    )

    # solve the problem
    solver = init_solver(problem_w_asf)
    return solver.solve(target)

step_back_index

step_back_index(
    responses: list[NAUTILUS_Response], step_number: int
) -> int

Find the index of the response with the given step number.

Note, multiple responses can have the same step number. This may happen if the DM takes a step back. In this case, the latest response with the given step number is returned. Note, as we precalculate all the responses, it is up to the analyst to show the steps that the DM thinks they have taken. Without this, the DM may be confused. In the worst case, the DM may take a step "back to the future".

Parameters:

Name Type Description Default
responses list[NAUTILUS_Response]

Responses returned by the NAUTILUS method.

required
step_number int

The step number to go back to.

required

Returns:

Name Type Description
int int

The index of the response with the given step number.

Source code in desdeo/mcdm/nautilus.py
def step_back_index(responses: list[NAUTILUS_Response], step_number: int) -> int:
    """Find the index of the response with the given step number.

    Note, multiple responses can have the same step
    number. This may happen if the DM takes a step back. In this case, the latest response with the given step number
    is returned. Note, as we precalculate all the responses, it is up to the analyst to show the steps that the DM
    thinks they have taken. Without this, the DM may be confused. In the worst case, the DM may take a step "back to
    the future".

    Args:
        responses (list[NAUTILUS_Response]): Responses returned by the NAUTILUS method.
        step_number (int): The step number to go back to.

    Returns:
        int : The index of the response with the given step number.
    """
    relevant_indices = [i for i, response in enumerate(responses) if response.step_number == step_number]
    # Choose latest index
    return relevant_indices[-1]

desdeo.mcdm.nautilus_navigator

Functions related to the NAUTILUS Navigator method are defined here.

Reference of the method:

Ruiz, Ana B., et al. "NAUTILUS Navigator: free search interactive multiobjective optimization without trading-off." Journal of Global Optimization 74.2 (2019): 213-231.

NAUTILUS_Response

Bases: BaseModel

The response of the NAUTILUS Navigator method.

Source code in desdeo/mcdm/nautilus_navigator.py
class NAUTILUS_Response(BaseModel):  # NOQA: N801
    """The response of the NAUTILUS Navigator method."""

    step_number: int = Field(description="The step number associted with this response.")
    distance_to_front: float = Field(
        description=(
            "The distance travelled to the Pareto front. "
            "The distance is a ratio of the distances between the nadir and navigation point, and "
            "the nadir and the reachable objective vector. The distance is given in percentage."
        )
    )
    reference_point: dict | None = Field(description="The reference point used in the step.")
    bounds: dict | None = Field(description="The user provided bounds.")
    navigation_point: dict = Field(description="The navigation point used in the step.")
    reachable_solution: dict | None = Field(description="The reachable solution found in the step.")
    reachable_bounds: dict = Field(description="The reachable bounds found in the step.")

NautilusNavigatorError

Bases: Exception

Raised when an exception is encountered with procedures related to NAUTILUS Navigator.

Source code in desdeo/mcdm/nautilus_navigator.py
class NautilusNavigatorError(Exception):
    """Raised when an exception is encountered with procedures related to NAUTILUS Navigator."""

calculate_distance_to_front

calculate_distance_to_front(
    problem: Problem,
    navigation_point: dict[str, float],
    reachable_objective_vector: dict[str, float],
) -> float

Calculates the distance to the Pareto optimal front from a navigation point.

It is assumed that a nadir point is defined for the problem.

Parameters:

Name Type Description Default
problem Problem

the problem being solved.

required
navigation_point dict[str, float]

the current navigation point.

required
reachable_objective_vector dict[str, float]

the current reachable objective vector from the navigation point.

required

Raises:

Type Description
NautilusNavigatorError

all or some of the components of the problem's nadir point are not defined.

Returns:

Name Type Description
float float

the distance to the front.

Source code in desdeo/mcdm/nautilus_navigator.py
def calculate_distance_to_front(
    problem: Problem, navigation_point: dict[str, float], reachable_objective_vector: dict[str, float]
) -> float:
    """Calculates the distance to the Pareto optimal front from a navigation point.

    It is assumed that a nadir point is defined for the problem.

    Args:
        problem (Problem): the problem being solved.
        navigation_point (dict[str, float]): the current navigation point.
        reachable_objective_vector (dict[str, float]): the current reachable objective vector from the navigation point.

    Raises:
        NautilusNavigatorError: all or some of the components of the problem's nadir point
            are not defined.

    Returns:
        float: the distance to the front.
    """
    nadir_point = objective_dict_to_numpy_array(problem, get_nadir_dict(problem))
    if None in nadir_point:
        msg = (
            f"Some or all the nadir values for the given problem are 'None': {nadir_point}. "
            "The nadir point must be fully defined."
        )
        raise NautilusNavigatorError(msg)

    z_nav = objective_dict_to_numpy_array(problem, navigation_point)
    f = objective_dict_to_numpy_array(problem, reachable_objective_vector)

    return (np.linalg.norm(z_nav - nadir_point) / np.linalg.norm(f - nadir_point)) * 100

calculate_navigation_point

calculate_navigation_point(
    problem: Problem,
    previous_navigation_point: dict[str, float],
    reachable_objective_vector: dict[str, float],
    number_of_steps_remaining: int,
) -> dict[str, float]

Calculates the navigation point.

The navigation point based on the previous navigation point, number of navigation steps remaining, and the reachable objective vector from the new navigation point.

Parameters:

Name Type Description Default
problem Problem

the problem being solved.

required
previous_navigation_point dict[str, float]

the previous navigation point.

required
reachable_objective_vector dict[str, float]

the current reachable objective vector from the navigation point.

required
number_of_steps_remaining int

the number of steps remaining in the navigation. Must be greater than 0.

required

Raises:

Type Description
NautilusNavigatorError

when the given number of steps remaining is less than 0.

Returns:

Type Description
dict[str, float]

list[float]: the navigation point.

Source code in desdeo/mcdm/nautilus_navigator.py
def calculate_navigation_point(
    problem: Problem,
    previous_navigation_point: dict[str, float],
    reachable_objective_vector: dict[str, float],
    number_of_steps_remaining: int,
) -> dict[str, float]:
    """Calculates the navigation point.

    The navigation point based on the previous navigation
    point, number of navigation steps remaining, and the reachable objective
    vector from the new navigation point.

    Args:
        problem (Problem): the problem being solved.
        previous_navigation_point (dict[str, float]): the previous navigation point.
        reachable_objective_vector (dict[str, float]): the current reachable objective vector from the navigation point.
        number_of_steps_remaining (int): the number of steps remaining in the navigation. Must be greater than 0.

    Raises:
        NautilusNavigatorError: when the given number of steps remaining is less than 0.

    Returns:
        list[float]: the navigation point.
    """
    if number_of_steps_remaining <= 0:
        msg = f"The given number of steps remaining ({number_of_steps_remaining=}) must be greater than 0."
        raise NautilusNavigatorError(msg)

    z_prev = objective_dict_to_numpy_array(problem, previous_navigation_point)
    f = objective_dict_to_numpy_array(problem, reachable_objective_vector).T
    rs = number_of_steps_remaining

    # return the new navigation point
    z = ((rs - 1) / (rs)) * z_prev + f / rs

    return numpy_array_to_objective_dict(problem, z)

get_current_path

get_current_path(
    all_responses: list[NAUTILUS_Response],
) -> list[int]

Get the path of the current responses.

All responses may contain steps that the DM has gone back on. This function returns the path of the current active path being followed by the DM. The path is a list of indices of the responses in the "all_responses" list. Note that the path includes all steps until reaching the Pareto front (or whatever the last response is). It is up to the analyst/GUI to only show the steps that the DM has taken.

Parameters:

Name Type Description Default
all_responses list[NAUTILUS_Response]

All responses returned by the NAUTILUS method.

required

Returns:

Type Description
list[int]

list[int]: The path of the current active responses.

Source code in desdeo/mcdm/nautilus_navigator.py
def get_current_path(all_responses: list[NAUTILUS_Response]) -> list[int]:
    """Get the path of the current responses.

    All responses may contain steps that the DM has gone back on. This function returns the path of the current active
    path being followed by the DM. The path is a list of indices of the responses in the "all_responses" list. Note that
    the path includes all steps until reaching the Pareto front (or whatever the last response is). It is up to the
    analyst/GUI to only show the steps that the DM has taken.

    Args:
        all_responses (list[NAUTILUS_Response]): All responses returned by the NAUTILUS method.

    Returns:
        list[int]: The path of the current active responses.
    """
    total_steps = all_responses[-1].step_number
    current_index = len(all_responses) - 1
    path: list[int] = [current_index]
    total_steps -= 1

    while total_steps >= 0:
        found_step = False
        while not found_step:
            current_index -= 1
            if all_responses[current_index].step_number == total_steps:
                path.append(current_index)
                found_step = True
        total_steps -= 1
    return list(reversed(path))

navigator_all_steps

navigator_all_steps(
    problem: Problem,
    steps_remaining: int,
    reference_point: dict,
    previous_responses: list[NAUTILUS_Response],
    bounds: dict | None = None,
    solver: BaseSolver | None = None,
) -> list[NAUTILUS_Response]

Performs all steps of the NAUTILUS method.

NAUTILUS needs to be initialized before calling this function. Once initialized, this function performs all steps of the method. However, this method need not start from the beginning. The method conducts "steps_remaining" number of steps from the last navigation point. The last navigation point is taken from the last response in "previous_responses" list. The first step in this algorithm always involves recalculating the reachable solution. All subsequest steps are precalculated without recalculating the reachable solution, with the assumption that the reference point has not changed. It is up to the user to only show the steps that the DM thinks they have taken.

Parameters:

Name Type Description Default
problem Problem

The problem to be solved.

required
steps_remaining int

The number of steps remaining.

required
reference_point dict

The reference point provided by the DM.

required
bounds dict

The bounds of the problem provided by the DM.

None
previous_responses list[NAUTILUS_Response]

The previous responses of the method.

required
solver BaseSolver | None

The solver to use. Defaults to None, in which case the algorithm will guess the best solver for the problem.

None

Returns:

Type Description
list[NAUTILUS_Response]

list[NAUTILUS_Response]: The new responses of the method after all steps. Note, as only new responses are returned, the length of the list is equal to "steps_remaining". The analyst should append these responses to the "previous_responses" list to keep track of the entire process.

Source code in desdeo/mcdm/nautilus_navigator.py
def navigator_all_steps(
    problem: Problem,
    steps_remaining: int,
    reference_point: dict,
    previous_responses: list[NAUTILUS_Response],
    bounds: dict | None = None,
    solver: BaseSolver | None = None,
) -> list[NAUTILUS_Response]:
    """Performs all steps of the NAUTILUS method.

    NAUTILUS needs to be initialized before calling this function. Once initialized, this function performs all
    steps of the method. However, this method need not start from the beginning. The method conducts "steps_remaining"
    number of steps from the last navigation point. The last navigation point is taken from the last response in
    "previous_responses" list. The first step in this algorithm always involves recalculating the reachable solution.
    All subsequest steps are precalculated without recalculating the reachable solution, with the assumption that the
    reference point has not changed. It is up to the user to only show the steps that the DM thinks they have taken.

    Args:
        problem (Problem): The problem to be solved.
        steps_remaining (int): The number of steps remaining.
        reference_point (dict): The reference point provided by the DM.
        bounds (dict): The bounds of the problem provided by the DM.
        previous_responses (list[NAUTILUS_Response]): The previous responses of the method.
        solver (BaseSolver | None, optional): The solver to use. Defaults to None, in which case the
            algorithm will guess the best solver for the problem.

    Returns:
        list[NAUTILUS_Response]: The new responses of the method after all steps. Note, as only new responses are
            returned, the length of the list is equal to "steps_remaining". The analyst should append these responses
            to the "previous_responses" list to keep track of the entire process.
    """
    responses: list[NAUTILUS_Response] = []
    nav_point = previous_responses[-1].navigation_point
    step_number = previous_responses[-1].step_number + 1
    first_iteration = True
    reachable_solution = dict
    while steps_remaining > 0:
        if first_iteration:
            response = navigator_step(
                problem,
                steps_remaining=steps_remaining,
                step_number=step_number,
                nav_point=nav_point,
                reference_point=reference_point,
                bounds=bounds,
                solver=solver,
            )
            first_iteration = False
        else:
            response = navigator_step(
                problem,
                steps_remaining=steps_remaining,
                step_number=step_number,
                nav_point=nav_point,
                reachable_solution=reachable_solution,
                bounds=bounds,
                solver=solver,
            )
        response.reference_point = reference_point
        responses.append(response)
        reachable_solution = response.reachable_solution
        nav_point = response.navigation_point
        steps_remaining -= 1
        step_number += 1
    return responses

navigator_init

navigator_init(
    problem: Problem, solver: BaseSolver | None = None
) -> NAUTILUS_Response

Initializes the NAUTILUS method.

Creates the initial response of the method, which sets the navigation point to the nadir point and the reachable bounds to the ideal and nadir points.

Parameters:

Name Type Description Default
problem Problem

The problem to be solved.

required
solver BaseSolver | None

The solver to use. Defaults to None.

None

Returns:

Name Type Description
NAUTILUS_Response NAUTILUS_Response

The initial response of the method.

Source code in desdeo/mcdm/nautilus_navigator.py
def navigator_init(problem: Problem, solver: BaseSolver | None = None) -> NAUTILUS_Response:
    """Initializes the NAUTILUS method.

    Creates the initial response of the method, which sets the navigation point to the nadir point
    and the reachable bounds to the ideal and nadir points.

    Args:
        problem (Problem): The problem to be solved.
        solver (BaseSolver | None, optional): The solver to use. Defaults to None.

    Returns:
        NAUTILUS_Response: The initial response of the method.
    """
    nav_point = get_nadir_dict(problem)
    lower_bounds, upper_bounds = solve_reachable_bounds(problem, nav_point, solver=solver)
    return NAUTILUS_Response(
        distance_to_front=0,
        navigation_point=nav_point,
        reachable_bounds={"lower_bounds": lower_bounds, "upper_bounds": upper_bounds},
        reachable_solution=None,
        reference_point=None,
        bounds=None,
        step_number=0,
    )

navigator_step

navigator_step(
    problem: Problem,
    steps_remaining: int,
    step_number: int,
    nav_point: dict,
    bounds: dict | None = None,
    solver: BaseSolver | None = None,
    reference_point: dict | None = None,
    reachable_solution: dict[str, float] | None = None,
) -> NAUTILUS_Response

Performs a step of the NAUTILUS method.

Parameters:

Name Type Description Default
problem Problem

The problem to be solved.

required
steps_remaining int

The number of steps remaining.

required
step_number int

The current step number. Just used for the response.

required
nav_point dict

The current navigation point.

required
solver BaseSolver | None

The solver to use. Defaults to None.

None
reference_point dict | None

The reference point provided by the DM. Defaults to None, in which case it is assumed that the DM has not changed their preference. The algorithm uses the last reachable solution, which must be provided in this case.

None
bounds dict | None

The bounds of the problem provided by the DM. Defaults to None.

None
reachable_solution dict | None

The previous reachable solution. Must only be provided if the DM has not changed their preference. Defaults to None.

None

Raises:

Type Description
NautilusNavigatorError

If neither reference_point nor reachable_solution is provided.

NautilusNavigatorError

If both reference_point and reachable_solution are provided.

Returns:

Name Type Description
NAUTILUS_Response NAUTILUS_Response

The response of the method after the step.

Source code in desdeo/mcdm/nautilus_navigator.py
def navigator_step(
    problem: Problem,
    steps_remaining: int,
    step_number: int,
    nav_point: dict,
    bounds: dict | None = None,
    solver: BaseSolver | None = None,
    reference_point: dict | None = None,
    reachable_solution: dict[str, float] | None = None,
) -> NAUTILUS_Response:
    """Performs a step of the NAUTILUS method.

    Args:
        problem (Problem): The problem to be solved.
        steps_remaining (int): The number of steps remaining.
        step_number (int): The current step number. Just used for the response.
        nav_point (dict): The current navigation point.
        solver (BaseSolver | None, optional): The solver to use. Defaults to None.
        reference_point (dict | None, optional): The reference point provided by the DM. Defaults to None, in which
            case it is assumed that the DM has not changed their preference. The
            algorithm uses the last reachable solution,
            which must be provided in this case.
        bounds (dict | None, optional): The bounds of the problem provided by the DM. Defaults to None.
        reachable_solution (dict | None, optional): The previous reachable solution. Must only be provided if the DM
            has not changed their preference. Defaults to None.

    Raises:
        NautilusNavigatorError: If neither reference_point nor reachable_solution is provided.
        NautilusNavigatorError: If both reference_point and reachable_solution are provided.

    Returns:
        NAUTILUS_Response: The response of the method after the step.
    """
    if reference_point is None and reachable_solution is None:
        raise NautilusNavigatorError("Either reference_point or reachable_solution must be provided.")

    if reference_point is not None and reachable_solution is not None:
        raise NautilusNavigatorError("Only one of reference_point or reachable_solution should be provided.")

    if reference_point is not None:
        opt_result = solve_reachable_solution(problem, reference_point, nav_point, solver, bounds=bounds)
        reachable_point = opt_result.optimal_objectives
    elif reachable_solution is not None:
        reachable_point = reachable_solution

    # update nav point
    new_nav_point = calculate_navigation_point(problem, nav_point, reachable_point, steps_remaining)

    # update_bounds

    lower_bounds, upper_bounds = solve_reachable_bounds(problem, new_nav_point, solver=solver, bounds=bounds)

    distance = calculate_distance_to_front(problem, new_nav_point, reachable_point)

    if bounds is None:
        bounds = {obj.symbol: obj.nadir for obj in problem.objectives}

    return NAUTILUS_Response(
        step_number=step_number,
        distance_to_front=distance,
        navigation_point=new_nav_point,
        reachable_solution=reachable_point,
        reference_point=reference_point,
        reachable_bounds={"lower_bounds": lower_bounds, "upper_bounds": upper_bounds},
        bounds=bounds,
    )

solve_reachable_bounds

solve_reachable_bounds(
    problem: Problem,
    navigation_point: dict[str, float],
    bounds: dict[str, float] | None = None,
    solver: BaseSolver | None = None,
    bound_th: float = 0.001,
) -> tuple[dict[str, float], dict[str, float]]

Computes the current reachable (upper and lower) bounds of the solutions in the objective space.

The reachable bound are computed based on the current navigation point. The bounds are computed by solving an epsilon constraint problem.

Parameters:

Name Type Description Default
problem Problem

the problem being solved.

required
navigation_point dict[str, float]

the navigation point limiting the reachable area. The key is the objective function's symbol and the value the navigation point.

required
bounds dict[str, float]

the user provided bounds preference.

None
solver BaseSolver | None

solver used to solve the problem. If None, then a solver is utilized bases on the problem's properties. Defaults to None.

None
bound_th float

a threshold for comparing the bounds to the set epsilon constraints.

0.001

Raises:

Type Description
NautilusNavigationError

when optimization of an epsilon constraint problem is not successful.

Returns:

Type Description
tuple[dict[str, float], dict[str, float]]

tuple[dict[str, float], dict[str, float]]: a tuple of dicts, where the first dict are the lower bounds and the second element the upper bounds, the key is the symbol of each objective.

Source code in desdeo/mcdm/nautilus_navigator.py
def solve_reachable_bounds(
    problem: Problem,
    navigation_point: dict[str, float],
    bounds: dict[str, float] | None = None,
    solver: BaseSolver | None = None,
    bound_th: float = 1e-3,
) -> tuple[dict[str, float], dict[str, float]]:
    """Computes the current reachable (upper and lower) bounds of the solutions in the objective space.

    The reachable bound are computed based on the current navigation point. The bounds are computed by
    solving an epsilon constraint problem.

    Args:
        problem (Problem): the problem being solved.
        navigation_point (dict[str, float]): the navigation point limiting the
            reachable area. The key is the objective function's symbol and the value
            the navigation point.
        bounds (dict[str, float]): the user provided bounds preference.
        solver (BaseSolver | None, optional): solver used to solve the problem.
            If None, then a solver is utilized bases on the problem's properties. Defaults to None.
        bound_th (float, optional): a threshold for comparing the bounds to the set epsilon constraints.

    Raises:
        NautilusNavigationError: when optimization of an epsilon constraint problem is not successful.

    Returns:
        tuple[dict[str, float], dict[str, float]]: a tuple of dicts, where the first dict are the lower bounds and the
            second element the upper bounds, the key is the symbol of each objective.
    """
    # If an objective is to be maximized, then the navigation point component of that objective should be
    # multiplied by -1.
    const_bounds = {
        objective.symbol: -1 * navigation_point[objective.symbol]
        if objective.maximize
        else navigation_point[objective.symbol]
        for objective in problem.objectives
    }

    # if a solver creator was provided, use that, else, guess the best one
    solver_init = guess_best_solver(problem) if solver is None else solver

    lower_bounds = {}
    upper_bounds = {}
    for objective in problem.objectives:
        # Lower bounds
        eps_problem, target, _ = add_epsilon_constraints(
            problem,
            "target",
            {f"{obj.symbol}": f"{obj.symbol}_eps" for obj in problem.objectives},
            objective.symbol,
            const_bounds,
        )

        # User bounds
        if bounds is not None:
            bound_constraints = [
                Constraint(
                    name=f"User bound for {obj.symbol}",
                    symbol=f"{obj.symbol}_user",
                    func=f"{obj.symbol}_min - {bounds[obj.symbol] * (-1 if obj.maximize else 1)}",
                    cons_type=ConstraintTypeEnum.LTE,
                    is_linear=obj.is_linear,
                    is_convex=obj.is_convex,
                    is_twice_differentiable=obj.is_twice_differentiable,
                )
                for obj in problem.objectives
            ]
            eps_problem = eps_problem.add_constraints(bound_constraints)

        # solve
        solver = solver_init(eps_problem)
        res = solver.solve(target)

        if not res.success:
            # could not optimize eps problem
            msg = (
                f"Optimizing the epsilon constrait problem for the objective "
                f"{objective.symbol} was not successful. Reason: {res.message}"
            )
            raise NautilusNavigatorError(msg)

        lower_bound = res.optimal_objectives[objective.symbol]

        if isinstance(lower_bound, list):
            lower_bound = lower_bound[0]

        # solver upper bounds
        eps_problem, target, _ = add_epsilon_constraints(
            problem,
            "target",
            {f"{obj.symbol}": f"{obj.symbol}_eps" for obj in problem.objectives},
            objective.symbol,
            const_bounds,
        )
        # We need to add a constrant related to the target objective to bound it to the navigation point
        # Maybe there should be a replacement to "create_epsilon_constraints_json" that allows for this
        # for now, we will add the constraint manually
        # target_expr[1] = -1  # maximize the objective
        target = "target"
        max_objective_scal = ScalarizationFunction(
            symbol=target,
            name="Max objective",
            func=["Negate", f"{objective.symbol}_min"],
            is_linear=objective.is_linear,
            is_convex=objective.is_convex,
            is_twice_differentiable=objective.is_twice_differentiable,
        )

        eps_problem = problem.add_scalarization(max_objective_scal)

        bound_to_nav_constraint = Constraint(
            symbol=f"{objective.symbol}_to_bound",
            name=f"To bound {objective.symbol} to user bounds",
            func=["Add", f"{objective.symbol}_min", ["Negate", const_bounds[objective.symbol]]],
            cons_type=ConstraintTypeEnum.LTE,
            is_linear=objective.is_linear,
            is_convex=objective.is_convex,
            is_twice_differentiable=objective.is_twice_differentiable,
        )

        # User bounds, add constraints
        if bounds is not None:
            bound_constraints = [
                Constraint(
                    name=f"User bound for {obj.symbol}",
                    symbol=f"{obj.symbol}_user",
                    func=f"{obj.symbol}_min - {bounds[obj.symbol] * (-1 if obj.maximize else 1)}",
                    cons_type=ConstraintTypeEnum.LTE,
                    is_linear=obj.is_linear,
                    is_convex=obj.is_convex,
                    is_twice_differentiable=obj.is_twice_differentiable,
                )
                for obj in problem.objectives
            ]
            eps_problem = eps_problem.add_constraints([bound_to_nav_constraint, *bound_constraints])
        else:
            eps_problem = eps_problem.add_constraints([bound_to_nav_constraint])

        # solve
        solver = solver_init(eps_problem)
        res = solver.solve(target)
        if not res.success:
            # could not optimize eps problem
            msg = (
                f"Optimizing the epsilon constrait problem for the objective "
                f"{objective.symbol} was not successful. Reason: {res.message}"
            )
            raise NautilusNavigatorError(msg)

        upper_bound = res.optimal_objectives[objective.symbol]

        if isinstance(upper_bound, list):
            upper_bound = upper_bound[0]

        if not (abs(upper_bound * (-1 if objective.maximize else 1) - const_bounds[objective.symbol]) < bound_th) and (
            upper_bound * (-1 if objective.maximize else 1) > const_bounds[objective.symbol]
        ):
            msg = "The upper bound is worse than the navigation point. This should not happen."
            raise NautilusNavigatorError(msg)

        # add the lower and upper bounds logically depending whether an objective is to be maximized or minimized
        lower_bounds[objective.symbol] = lower_bound if not objective.maximize else upper_bound
        upper_bounds[objective.symbol] = upper_bound if not objective.maximize else lower_bound

    return lower_bounds, upper_bounds

solve_reachable_solution

solve_reachable_solution(
    problem: Problem,
    reference_point: dict[str, float],
    previous_nav_point: dict[str, float],
    solver: BaseSolver | None = None,
    bounds: dict[str, float] | None = None,
) -> SolverResults

Calculates the reachable solution on the Pareto optimal front.

For the calculation to make sense in the context of NAUTILUS Navigator, the reference point should be bounded by the reachable bounds present at the navigation step the reference point has been given.

In practice, the reachable solution is calculated by solving an achievement scalarizing function.

Parameters:

Name Type Description Default
problem Problem

the problem being solved.

required
reference_point dict[str, float]

the reference point to project on the Pareto optimal front.

required
previous_nav_point dict[str, float]

the previous navigation point. The reachable solution found is always better than the previous navigation point.

required
solver BaseSolver | None

solver to solve the problem. If None, then a solver is utilized bases on the problem's properties. Defaults to None.

None
bounds dict[str, float] | None

the bounds of the problem. Defaults to None.

None

Returns:

Name Type Description
SolverResults SolverResults

the results of the projection.

Source code in desdeo/mcdm/nautilus_navigator.py
def solve_reachable_solution(
    problem: Problem,
    reference_point: dict[str, float],
    previous_nav_point: dict[str, float],
    solver: BaseSolver | None = None,
    bounds: dict[str, float] | None = None,
) -> SolverResults:
    """Calculates the reachable solution on the Pareto optimal front.

    For the calculation to make sense in the context of NAUTILUS Navigator, the reference point
    should be bounded by the reachable bounds present at the navigation step the
    reference point has been given.

    In practice, the reachable solution is calculated by solving an achievement
    scalarizing function.

    Args:
        problem (Problem): the problem being solved.
        reference_point (dict[str, float]): the reference point to project on the Pareto optimal front.
        previous_nav_point (dict[str, float]): the previous navigation point. The reachable solution found
            is always better than the previous navigation point.
        solver (BaseSolver | None, optional): solver to solve the problem.
            If None, then a solver is utilized bases on the problem's properties. Defaults to None.
        bounds (dict[str, float] | None, optional): the bounds of the problem. Defaults to None.

    Returns:
        SolverResults: the results of the projection.
    """
    # check solver
    init_solver = guess_best_solver(problem) if solver is None else solver

    # create and add scalarization function
    if problem.is_twice_differentiable:
        # differentiable problem
        problem_w_asf, target = add_asf_diff(
            problem,
            symbol="asf",
            reference_point=reference_point,  # TODO: reference_in_aug=True
        )
    else:
        # non-differentiable problem
        problem_w_asf, target = add_asf_nondiff(
            problem, symbol="asf", reference_point=reference_point, reference_in_aug=True
        )

    # Note: We do not solve the global problem. Instead, we solve this constrained problem:
    constraints = [
        Constraint(
            name=f"_const_{i + 1}",
            symbol=f"_const_{i + 1}",
            func=f"{obj.symbol}_min - {previous_nav_point[obj.symbol] * (-1 if obj.maximize else 1)}",
            cons_type=ConstraintTypeEnum.LTE,
            is_linear=obj.is_linear,
            is_convex=obj.is_convex,
            is_twice_differentiable=obj.is_twice_differentiable,
        )
        for i, obj in enumerate(problem.objectives)
    ]

    if bounds is not None:
        constraints += [
            Constraint(
                name=f"_const_bound_{i + 1}",
                symbol=f"_const_bound_{i + 1}",
                cons_type=ConstraintTypeEnum.LTE,
                func=f"{obj.symbol}_min - {bounds[obj.symbol] * (-1 if obj.maximize else 1)}",
                is_linear=obj.is_linear,
                is_convex=obj.is_convex,
                is_twice_differentiable=obj.is_twice_differentiable,
            )
            for i, obj in enumerate(problem.objectives)
        ]

    problem_w_asf = problem_w_asf.add_constraints(constraints)

    # solve the problem
    solver = init_solver(problem_w_asf)
    return solver.solve(target)

step_back_index

step_back_index(
    responses: list[NAUTILUS_Response], step_number: int
) -> int

Find the index of the response with the given step number.

Note, multiple responses can have the same step number. This may happen if the DM takes a step back. In this case, the latest response with the given step number is returned. Note, as we precalculate all the responses, it is up to the analyst to show the steps that the DM thinks they have taken. Without this, the DM may be confused. In the worst case, the DM may take a step "back to the future".

Parameters:

Name Type Description Default
responses list[NAUTILUS_Response]

Responses returned by the NAUTILUS method.

required
step_number int

The step number to go back to.

required

Returns:

Name Type Description
int int

The index of the response with the given step number.

Source code in desdeo/mcdm/nautilus_navigator.py
def step_back_index(responses: list[NAUTILUS_Response], step_number: int) -> int:
    """Find the index of the response with the given step number.

    Note, multiple responses can have the same step
    number. This may happen if the DM takes a step back. In this case, the latest response with the given step number
    is returned. Note, as we precalculate all the responses, it is up to the analyst to show the steps that the DM
    thinks they have taken. Without this, the DM may be confused. In the worst case, the DM may take a step "back to
    the future".

    Args:
        responses (list[NAUTILUS_Response]): Responses returned by the NAUTILUS method.
        step_number (int): The step number to go back to.

    Returns:
        int : The index of the response with the given step number.
    """
    relevant_indices = [i for i, response in enumerate(responses) if response.step_number == step_number]
    # Choose latest index
    return relevant_indices[-1]

desdeo.mcdm.pareto_navigator

Functions related to the Pareto Navigator method are defined here.

Reference of the method:

Eskelinen, Petri, et al. "Pareto navigator for interactive nonlinear multiobjective optimization." OR spectrum 32 (2010): 211-227.

calculate_adjusted_speed

calculate_adjusted_speed(
    allowed_speeds: list[int],
    speed: float,
    scalar: float | None = 20,
) -> float

Calculate an adjusted speed from a given float.

Note

Adjusting the speed is not specified in the article but seems necessary.

Parameters:

Name Type Description Default
allowed_speeds ndarray

An array of allowed speeds.

required
speed float

A given speed value where.

required
scalar float | None(optional)

A scale to adjust the speed. Defaults to 20.

20

Returns:

Name Type Description
float float

An adjusted speed value calculated from given float. Is between 0 and 1.

Source code in desdeo/mcdm/pareto_navigator.py
def calculate_adjusted_speed(allowed_speeds: list[int], speed: float, scalar: float | None = 20) -> float:
    """Calculate an adjusted speed from a given float.

    Note:
        Adjusting the speed is not specified in the article but seems necessary.

    Args:
        allowed_speeds (np.ndarray): An array of allowed speeds.
        speed (float): A given speed value where.
        scalar (float | None (optional)): A scale to adjust the speed. Defaults to 20.

    Returns:
        float: An adjusted speed value calculated from given float.
            Is between 0 and 1.
    """
    return (speed / np.max(allowed_speeds)) / scalar

calculate_all_solutions

calculate_all_solutions(
    problem: Problem,
    current_solution: dict[str, float],
    alpha: float,
    num_solutions: int,
    pref_info: dict,
) -> list[dict[str, float]]

Performs a set number of steps in the current direction.

Parameters:

Name Type Description Default
problem Problem

The problem being solved.

required
current_solution dict[str, float]

The current solution.

required
alpha float

Step size. Between 0 and 1.

required
num_solutions int

Number of solutions calculated.

required
pref_info dict

Preference information. Either "reference_point" or "classification".

required

Returns:

Type Description
list[dict[str, float]]

list[dict[str, float]]: A list of the computed solutions.

Source code in desdeo/mcdm/pareto_navigator.py
def calculate_all_solutions(
    problem: Problem,
    current_solution: dict[str, float],
    alpha: float,
    num_solutions: int,
    pref_info: dict
) -> list[dict[str, float]]:
    """Performs a set number of steps in the current direction.

    Args:
        problem (Problem): The problem being solved.
        current_solution (dict[str, float]): The current solution.
        alpha (float): Step size. Between 0 and 1.
        num_solutions (int): Number of solutions calculated.
        pref_info (dict): Preference information. Either "reference_point" or "classification".

    Returns:
        list[dict[str, float]]: A list of the computed solutions.
    """
    solution = current_solution

    # check if the preference information is given as a reference point or as classification
    # and calculate the search direction based on the preference information
    if "reference_point" in pref_info:
        reference_point = pref_info["reference_point"]
        d = calculate_search_direction(problem, reference_point, current_solution)
    elif "classification" in pref_info:
        reference_point = classification_to_reference_point(problem, pref_info["classification"], solution)
        d = calculate_search_direction(problem, reference_point, solution)

    # the A matrix and b vector from the polyhedral set equation
    matrix_a, b = get_polyhedral_set(problem)

    # the A' matrix from the linear parametric programming problem
    matrix_a_new = construct_matrix_a(problem, matrix_a)

    solutions: list[dict[str, float]] = []
    while len(solutions) < num_solutions:
        solution = calculate_next_solution(problem, d, solution, alpha, matrix_a_new, b)
        solutions.append(solution)
    return solutions

calculate_next_solution

calculate_next_solution(
    problem: Problem,
    search_direction: dict[str, float],
    current_solution: dict[str, float],
    alpha: float,
    matrix_a: ndarray,
    b: ndarray,
) -> dict[str, float]

Calculate the next solution.

Parameters:

Name Type Description Default
problem Problem

The problem being solved.

required
search_direction dict[str, float]

The search direction.

required
current_solution dict[str, float]

The currently navigated point.

required
alpha float

Step size. Between 0 and 1.

required
matrix_a ndarray

The A' matrix.

required
b ndarray

The b vector.

required

Returns:

Type Description
dict[str, float]

dict[str, float]: The next solution.

Source code in desdeo/mcdm/pareto_navigator.py
def calculate_next_solution( # NOQA: PLR0913
    problem: Problem,
    search_direction: dict[str, float],
    current_solution: dict[str, float],
    alpha: float,
    matrix_a: np.ndarray,
    b: np.ndarray
) -> dict[str, float]:
    """Calculate the next solution.

    Args:
        problem (Problem): The problem being solved.
        search_direction (dict[str, float]): The search direction.
        current_solution (dict[str, float]): The currently navigated point.
        alpha (float): Step size. Between 0 and 1.
        matrix_a (np.ndarray): The A' matrix.
        b (np.ndarray): The b vector.

    Returns:
        dict[str, float]: The next solution.
    """
    z = objective_dict_to_numpy_array(problem, current_solution)
    k = len(z)
    d = objective_dict_to_numpy_array(problem, search_direction)

    q = z + alpha * d
    q = np.reshape(q, ((k, 1)))

    b_new = np.append(q, b)

    ideal = objective_dict_to_numpy_array(problem, problem.get_ideal_point())
    nadir = objective_dict_to_numpy_array(problem, problem.get_nadir_point())

    c = np.array([1] + k * [0])

    obj_bounds = np.stack((ideal, nadir))
    bounds = [(None, None)]
    for x, y in obj_bounds.T:
        bounds.append((x, y))

    z_new = linprog(c=c, A_ub=matrix_a, b_ub=b_new, bounds=bounds)
    if z_new["success"]:
        return numpy_array_to_objective_dict(problem, z_new["x"][1:])
    return current_solution # should raise an exception instead

calculate_search_direction

calculate_search_direction(
    problem: Problem,
    reference_point: dict[str, float],
    current_point: dict[str, float],
) -> dict[str, float]

Calculate search direction from the current point to the reference point.

Parameters:

Name Type Description Default
problem Problem

The problem being solved.

required
reference_point dict[str, float]

The given reference point.

required
current_point dict[str, float]

Currently navigated point.

required

Returns:

Type Description
dict[str, float]

dict[str, float]: The direction from the current point to the reference point.

Source code in desdeo/mcdm/pareto_navigator.py
def calculate_search_direction(
    problem: Problem,
    reference_point: dict[str, float],
    current_point: dict[str, float]
) -> dict[str, float]:
    """Calculate search direction from the current point to the reference point.

    Args:
        problem (Problem): The problem being solved.
        reference_point (dict[str, float]): The given reference point.
        current_point (dict[str, float]): Currently navigated point.

    Returns:
        dict[str, float]: The direction from the current point to the reference point.
    """
    z = objective_dict_to_numpy_array(problem, current_point)
    q = objective_dict_to_numpy_array(problem, reference_point)

    d = q - z
    return numpy_array_to_objective_dict(problem, d)

classification_to_reference_point

classification_to_reference_point(
    problem: Problem,
    pref_info: dict[str, str],
    current_solution: dict[str, float],
) -> dict[str, float]

Convert preference information given as classification into a reference point.

Parameters:

Name Type Description Default
problem Problem

The problem being solved.

required
pref_info dict[str, str]

The preference information given as classification.

required
current_solution dict[str, float]

The current solution.

required

Returns:

Type Description
dict[str, float]

dict[str, float]: A reference point converted from classification.

Source code in desdeo/mcdm/pareto_navigator.py
def classification_to_reference_point(
    problem: Problem,
    pref_info: dict[str, str],
    current_solution: dict[str, float]
) -> dict[str, float]:
    """Convert preference information given as classification into a reference point.

    Args:
        problem (Problem): The problem being solved.
        pref_info (dict[str, str]): The preference information given as classification.
        current_solution (dict[str, float]): The current solution.

    Returns:
        dict[str, float]: A reference point converted from classification.
    """
    ref = []
    ideal = problem.get_ideal_point()
    nadir = problem.get_nadir_point()

    for pref in pref_info:
        if pref_info[pref] == "<":
            ref.append(ideal[pref])
        elif pref_info[pref] == ">":
            ref.append(nadir[pref])
        elif pref_info[pref] == "=":
            ref.append(current_solution[pref])

    return numpy_array_to_objective_dict(problem, np.array(ref))

construct_matrix_a

construct_matrix_a(
    problem: Problem, matrix_a: ndarray
) -> np.ndarray

Construct the A' matrix in the linear parametric programming problem from the article.

Parameters:

Name Type Description Default
problem Problem

The problem being solved.

required
matrix_a ndarray

The A matrix from the polyhedral set equation.

required

Returns:

Type Description
ndarray

np.ndarray: The A' matrix in the linear parametric programming problem from the article.

Source code in desdeo/mcdm/pareto_navigator.py
def construct_matrix_a(problem: Problem, matrix_a: np.ndarray) -> np.ndarray:
    """Construct the A' matrix in the linear parametric programming problem from the article.

    Args:
        problem (Problem): The problem being solved.
        matrix_a (np.ndarray): The A matrix from the polyhedral set equation.

    Returns:
        np.ndarray: The A' matrix in the linear parametric programming problem from the article.
    """
    ideal = objective_dict_to_numpy_array(problem, problem.get_ideal_point())
    nadir = objective_dict_to_numpy_array(problem, problem.get_nadir_point())
    weights = 1/(nadir - ideal)

    weights_inverse = np.reshape(np.vectorize(lambda w: -1 / w)(weights), (len(weights), 1))
    identity = np.identity(len(weights))
    a_upper = np.c_[weights_inverse, identity]

    zeros = np.zeros((len(matrix_a), 1))
    a_lower = np.c_[zeros, matrix_a]

    return np.concatenate((a_upper, a_lower))

get_polyhedral_set

get_polyhedral_set(
    problem: Problem,
) -> tuple[np.ndarray, np.ndarray]

Get a polyhedral set as convex hull from the set of pareto optimal solutions.

Parameters:

Name Type Description Default
problem Problem

The problem being solved.

required

Returns:

Type Description
tuple[ndarray, ndarray]

tuple[np.ndarray, np.ndarray]: The A matrix and b vector from the polyhedral set equation.

Source code in desdeo/mcdm/pareto_navigator.py
def get_polyhedral_set(problem: Problem) -> tuple[np.ndarray, np.ndarray]:
    """Get a polyhedral set as convex hull from the set of pareto optimal solutions.

    Args:
        problem (Problem): The problem being solved.

    Returns:
        tuple[np.ndarray, np.ndarray]: The A matrix and b vector from the polyhedral set equation.
    """
    objective_values = problem.discrete_representation.objective_values
    representation = np.array([objective_values[obj.symbol] for obj in problem.objectives])

    convex_hull = ConvexHull(representation.T)
    matrix_a = convex_hull.equations[:, 0:-1]
    b = -convex_hull.equations[:, -1]
    return matrix_a, b

desdeo.mcdm.reference_point_method

Functions related to the reference point method.

This module contains functions related to the reference point method as presented in Wierzbicki, A. P. (1982). A mathematical basis for satisficing decision making. Mathematical modelling, 3(5), 391-405.

ReferencePointError

Bases: Exception

Raised when an error with the reference point method is encountered.

Source code in desdeo/mcdm/reference_point_method.py
class ReferencePointError(Exception):
    """Raised when an error with the reference point method is encountered."""

rpm_intermediate_solutions

rpm_intermediate_solutions(
    problem: Problem,
    solution_1: dict[str, float],
    solution_2: dict[str, float],
    num_desired: int,
    scalarization_options: dict | None = None,
    solver: BaseSolver | None = None,
    solver_options: SolverOptions | None = None,
) -> list[SolverResults]

Generates a desired number of intermediate solutions between two given solutions.

Generates a desires number of intermediate solutions given reference vectors. The solutions are generated by taking n number of steps between the two reference points in the objective space. The vectors corresponding to these values are then utilized as reference points in the achievement scalarizing function. Solving the functions for each reference point will a solution close the the projection of the reference point on the Pareto optimal front of the problem. These solutions are then returned. Note that the intermediate solutions are generated between the two given reference points, this means the returned solutions will not include solutions corresponding to the original reference points.

Parameters:

Name Type Description Default
problem Problem

the problem being solved.

required
solution_1 dict[str, VariableType]

the first of the reference points between which the intermediate solutions are to be generated.

required
solution_2 dict[str, VariableType]

the second of the reference points between which the intermediate solutions are to be generated.

required
num_desired int

the number of desired intermediate solutions to be generated. Must be at least 1.

required
scalarization_options dict | None

optional kwargs passed to the scalarization function. Defaults to None.

None
solver BaseSolver | None

solver used to solve the problem. If not given, an appropriate solver will be automatically determined based on the features of problem. Defaults to None.

None
solver_options SolverOptions | None

optional options passed to the solver. Ignored if solver is None. Defaults to None.

None

Returns:

Type Description
list[SolverResults]

list[SolverResults]: a list with the projected intermediate solutions as SolverResults objects.

Source code in desdeo/mcdm/reference_point_method.py
def rpm_intermediate_solutions(  # noqa: PLR0913
    problem: Problem,
    solution_1: dict[str, float],
    solution_2: dict[str, float],
    num_desired: int,
    scalarization_options: dict | None = None,
    solver: BaseSolver | None = None,
    solver_options: SolverOptions | None = None,
) -> list[SolverResults]:
    """Generates a desired number of intermediate solutions between two given solutions.

    Generates a desires number of intermediate solutions given reference vectors.
    The solutions are generated by taking n number of steps between the two reference points in the
    objective space. The vectors corresponding to these values are then
    utilized as reference points in the achievement scalarizing function. Solving the functions
    for each reference point will a solution close the the projection of the reference point on the
    Pareto optimal front of the problem. These solutions are then returned. Note that the
    intermediate solutions are generated _between_ the two given reference points, this means the
    returned solutions will not include solutions corresponding to the original reference points.

    Args:
        problem (Problem): the problem being solved.
        solution_1 (dict[str, VariableType]): the first of the reference points between which the intermediate
            solutions are to be generated.
        solution_2 (dict[str, VariableType]): the second of the reference points between which the intermediate
            solutions are to be generated.
        num_desired (int): the number of desired intermediate solutions to be generated. Must be at least `1`.
        scalarization_options (dict | None, optional): optional kwargs passed to the scalarization function.
            Defaults to None.
        solver (BaseSolver | None, optional): solver used to solve the problem.
            If not given, an appropriate solver will be automatically determined based on the features of `problem`.
            Defaults to None.
        solver_options (SolverOptions | None, optional): optional options passed
            to the `solver`. Ignored if `solver` is `None`.
            Defaults to None.

    Returns:
        list[SolverResults]: a list with the projected intermediate solutions as
            `SolverResults` objects.
    """
    if int(num_desired) < 1:
        msg = f"The given number of desired intermediate ({num_desired=}) solutions must be at least 1."
        raise ReferencePointError(msg)

    init_solver = guess_best_solver(problem) if solver is None else solver
    _solver_options = None if solver_options is None or solver is None else solver_options

    # Find intermediate solutions by dividing the distance between the two Pareto points into num_desired+1 steps,
    # calculate the solutions found in between the points, because we don't want to find the original solutions
    intermediate_solutions = []

    for i in range(num_desired):
        rp = {
            key: ((i + 1) * solution_1[key] + (num_desired - i) * solution_2[key]) / (num_desired + 1)
            for key in solution_1.keys()  # noqa: SIM118
        }
        # add scalarization
        add_asf = add_asf_diff if problem.is_twice_differentiable else add_asf_nondiff
        asf_problem, target = add_asf(problem, "target", rp, **(scalarization_options or {}))

        solver = init_solver(asf_problem, _solver_options)

        # solve and store results
        result: SolverResults = solver.solve(target)

        intermediate_solutions.append(result)

    return intermediate_solutions

rpm_solve_solutions

rpm_solve_solutions(
    problem: Problem,
    reference_point: dict[str, float],
    scalarization_options: dict | None = None,
    solver: BaseSolver | None = None,
    solver_options: SolverOptions | None = None,
) -> list[SolverResults]

Finds (near) Pareto optimal solutions based on a reference point.

Find a (near) Pareto optimal solution based on the given reference point by optimizing an achievement scalarizing function. The original reference point is also perturbed, and another k (near) Pareto optimal solutions are found. The k+1 solutions are then returned.

Parameters:

Name Type Description Default
problem Problem

the problem to be solved.

required
reference_point dict[str, float]

the reference point. The keys of the dict represent the objective function symbols defined in problem. The values represent the aspiration level values.

required
scalarization_options dict | None

keyword arguments to be passed to the achievement scalarizing function. Defaults to None.

None
solver BaseSolver | None

solver to optimize the achievement scalarizing function. If not given, the method tried to guess the most suitable solver based on the problem. Defaults to None.

None
solver_options SolverOptions | None

options passed to the solver. If not given, the solver will use its default options. Defaults to None.

None

Raises:

Type Description
ReferencePointError

the reference point is ill-defined.

Returns:

Type Description
list[SolverResults]

list[SolverResults]: a list of results containing the solutions found using the reference point and the k perturbed reference points.

Note

If the problem is twice differentiable, add_asf_diff is used from desdeo.tools. If the problem is not twice differentiable, then add_asf_nondiff is used.

Source code in desdeo/mcdm/reference_point_method.py
def rpm_solve_solutions(
    problem: Problem,
    reference_point: dict[str, float],
    scalarization_options: dict | None = None,
    solver: BaseSolver | None = None,
    solver_options: SolverOptions | None = None,
) -> list[SolverResults]:
    """Finds (near) Pareto optimal solutions based on a reference point.

    Find a (near) Pareto optimal solution based on the given reference point by
    optimizing an achievement scalarizing function. The original reference point
    is also perturbed, and another k (near) Pareto optimal solutions are found.
    The k+1 solutions are then returned.

    Args:
        problem (Problem): the problem to be solved.
        reference_point (dict[str, float]): the reference point. The keys of the dict
            represent the objective function symbols defined in problem. The values
            represent the aspiration level values.
        scalarization_options (dict | None, optional): keyword arguments to be
            passed to the achievement scalarizing function. Defaults to None.
        solver (BaseSolver | None, optional): solver to optimize the achievement
            scalarizing function. If not given, the method tried to guess the
            most suitable solver based on the problem. Defaults to None.
        solver_options (SolverOptions | None, optional): options passed to the
            solver. If not given, the solver will use its default options. Defaults to None.

    Raises:
        ReferencePointError: the reference point is ill-defined.

    Returns:
        list[SolverResults]: a list of results containing the solutions found using
            the reference point and the k perturbed reference points.

    Note:
        If the problem is twice differentiable, `add_asf_diff` is used from `desdeo.tools`.
            If the problem is not twice differentiable, then `add_asf_nondiff` is used.
    """
    # setup problem with ASF
    # check if problem is differentiable or not, choose appropriate ASF variant.

    if not all(obj.symbol in reference_point for obj in problem.objectives):
        msg = f"The reference point {reference_point} is missing entries for one or more of the objective functions."
        raise ReferencePointError(msg)

    _add_asf = add_asf_diff if problem.is_twice_differentiable else add_asf_nondiff

    problem_w_asf, target = _add_asf(
        problem, "_asf", reference_point, **scalarization_options if scalarization_options is not None else {}
    )

    # setup solver
    # solve scalarized problem with given reference point

    _init_solver = guess_best_solver(problem_w_asf) if solver is None else solver
    _solver = _init_solver(problem_w_asf, solver_options)

    initial_solution = _solver.solve(target)

    # using the found solution, perturb the reference point to get
    # k (num of objectives) perturbed reference points

    initial_objective_vector = objective_dict_to_numpy_array(problem, initial_solution.optimal_objectives)
    reference_point_vector = objective_dict_to_numpy_array(problem, reference_point)

    distance = np.linalg.norm(reference_point_vector - initial_objective_vector)
    unit_vectors = np.eye(len(initial_objective_vector))

    perturbed_reference_point_vectors = reference_point_vector + (distance * unit_vectors)

    perturbed_reference_points = [numpy_array_to_objective_dict(problem, v) for v in perturbed_reference_point_vectors]

    # scalarize the problem using the appropriate ASF variant and the perturbed
    # reference points

    perturbed_problems_and_targets = [
        _add_asf(problem, "_asf", rp, **scalarization_options if scalarization_options is not None else {})
        for rp in perturbed_reference_points
    ]

    # solve the problems
    perturbed_solutions = [
        _init_solver(problem_and_target[0], solver_options).solve(problem_and_target[1])
        for problem_and_target in perturbed_problems_and_targets
    ]

    # return the original solution and the solutions found with the perturbed reference points
    return [initial_solution, *perturbed_solutions]