# This Source Code Form is subject to the terms of the Mozilla Public
# License, v. 2.0. If a copy of the MPL was not distributed with this
# file, You can obtain one at http://mozilla.org/MPL/2.0/.
#
# Copyright (c) 2016 Vesa Ojalehto
"""
NAUTILUS method variants. The first NAUTILUS variant was introduced in [MIETTINEN2010]_.
References
----------
.. [MIETTINEN2010] Miettinen, K.; Eskelinen, P.; Ruiz, F. & Luque, M.
NAUTILUS method: An interactive technique in multiobjective optimization
based on the nadir point
European Journal of Operational Research, 2010, 206, 426-434
TODO
----
Add all variants
Longer descriptions of the method variants and methods
"""
import logging
from typing import List, Tuple, Type # noqa - rm when pyflakes understands type hints
from warnings import warn
import numpy as np
from sklearn.cluster import KMeans
from sklearn.metrics import pairwise_distances_argmin_min
import desdeo.utils as utils
from desdeo.core.ResultFactory import BoundsFactory, IterationPointFactory
from desdeo.optimization.OptimizationMethod import OptimizationMethod
from desdeo.optimization.OptimizationProblem import (
EpsilonConstraintProblem,
MaxEpsilonConstraintProblem,
NautilusAchievementProblem,
)
from desdeo.utils import misc, reachable_points
from desdeo.utils.warnings import UnexpectedCondition
from .base import InteractiveMethod
[docs]class NAUTILUS(InteractiveMethod):
[docs] def __init__(self, method, method_class: Type[OptimizationMethod]) -> None:
super().__init__(method, method_class)
self.user_iters = 5
self.current_iter = self.user_iters
self.fh_factory = IterationPointFactory(
self.method_class(NautilusAchievementProblem(self.problem))
)
self.upper_bounds_factory = BoundsFactory(
self.method_class(MaxEpsilonConstraintProblem(self.problem))
)
self.lower_bounds_factory = BoundsFactory(
self.method_class(EpsilonConstraintProblem(self.problem))
)
self.preference = None
self.fh_lo, self.zh = tuple(self.problem.objective_bounds())
self.fh = list(self.fh_lo)
self.zh_prev = list(self.zh)
[docs] def _init_iteration(self, *args, **kwargs):
pass
[docs] def _next_iteration(self, *args, **kwargs):
pass
[docs] def _update_fh(self):
self.fh = list(self.fh_factory.result(self.preference, self.zh_prev)[1])
[docs] def _next_zh(self, term1, term2):
res = list(
(self.current_iter - 1)
* np.array(term1)
/ self.current_iter
+ np.array(term2)
/ self.current_iter
)
logging.debug("Update zh")
logging.debug("First term: %s", term1)
logging.debug("Second term: %s", term2)
for i in range(3):
logging.debug(
"%i/%i * %f + %f/%i =%f",
(self.current_iter - 1),
self.current_iter,
term1[i],
term2[i],
self.current_iter,
res[i],
)
return res
[docs] def _update_zh(self, term1, term2):
self.zh_prev = list(self.zh)
self.zh = list(self._next_zh(term1, term2))
[docs] def distance(self, where: List[float], target: List[float]):
a_nadir = np.array(self.problem.nadir)
a_ideal = np.array(self.problem.ideal)
w = a_ideal - a_nadir
u = np.linalg.norm((np.array(where) - a_nadir) / w)
l = np.linalg.norm((np.array(target) - a_nadir) / w)
logging.debug("\nNADIR: %s", self.problem.nadir)
logging.debug("zh: %s", where)
logging.debug("PO: %s", target)
if not l:
logging.debug("Distance: %s", 0.0)
return 0.0
logging.debug("Distance: %s", (u / l) * 100)
return (u / l) * 100
[docs]class ENAUTILUS(NAUTILUS):
[docs] def __init__(self, method, method_class: Type[OptimizationMethod]) -> None:
super().__init__(method, method_class)
self.Ns = 5
self.fh_lo_prev = None
self.fh_lo_prev = None
self.nsPoint_prev = [] # type: List[float]
self.zhs = [] # type: List[float]
self.zh_los = [] # type: List[float]
self.zh_reach = [] # type: List[int]
self.NsPoints = [] # type: List[Tuple[np.ndarray, List[float]]]
[docs] def print_current_iteration(self):
if self.current_iter < 0:
print("Final iteration point:", self.zh_prev)
else:
print(
"Iteration %s/%s\n"
% (self.user_iters - self.current_iter, self.user_iters)
)
for pi, ns_point in enumerate(self.NsPoints):
print(
"Ns %i (%s)" % (pi + 1, self.zh_reach[pi] if self.zh_reach else "-")
)
print("Iteration point:", self.zhs[pi])
if self.current_iter != 0:
print("Lower boundary: ", self.zh_los[pi])
print("Closeness: ", self.distance(self.zhs[pi], ns_point))
print("==============================")
[docs] def _update_zh(self, term1, term2):
return self._next_zh(term1, term2)
[docs] def select_point(self, point):
pass
[docs] def next_iteration(self, *args, preference=None, **kwargs):
if preference and preference[0]:
self.zh_prev = list(preference[0])
else:
self.zh_prev = self.problem.nadir[:]
if preference and preference[1]:
self.fh_lo = list(preference[1])
else:
self.fh_lo = self.problem.ideal[:]
# TODO Create weights if not given, e.g., using
# Steuer RE, Choo EU. An interactive weighted Tchebycheff procedure for
# multiple objective programming.
# Mathematical programming. 1983; 26(3):326-44.
points = misc.new_points(self.fh_factory, self.zh)
# Find centroids
if len(points) <= self.Ns:
print(
(
"Only %s points can be reached from selected iteration point"
% len(points)
)
)
self.NsPoints = points
else:
# k-mean cluster Ns solutions
k_means = KMeans(n_clusters=self.Ns)
solution_points = [points[1] for point in points]
k_means.fit(solution_points)
closest = set(
pairwise_distances_argmin_min(
k_means.cluster_centers_, solution_points
)[
0
]
)
self.NsPoints = []
for point_idx in closest:
self.NsPoints.append(points[point_idx])
# Find iteration point for each centroid
for _, point in self.NsPoints:
self.zhs.append(self._update_zh(self.zh_prev, point))
self.fh_lo = list(self.lower_bounds_factory.result(self.zh_prev))
self.zh_los.append(self.fh_lo)
if not self.problem.points:
self.zh_reach = []
else:
self.zh_reach.append(
len(reachable_points(self.NsPoints, self.zh_los[-1], self.zhs[-1]))
)
self.current_iter -= 1
return list(zip(self.zh_los, self.zhs))
[docs]class NAUTILUSv1(NAUTILUS):
"""
The first NAUTILUS method variant [MIETTINEN2010]_.
References
----------
.. [MIETTINEN2010] Miettinen, K.; Eskelinen, P.; Ruiz, F. & Luque, M.,
NAUTILUS method: An interactive technique in multiobjective optimization based on the nadir point,
European Journal of Operational Research, 2010 , 206 , 426-434.
"""
[docs] def print_current_iteration(self):
if self.current_iter == 0:
print("Closeness: ", self.distance(self.zh, self.fh))
print("Final iteration point:", self.zh)
else:
print(
"Iteration %s/%s"
% (self.user_iters - self.current_iter, self.user_iters)
)
print("Closeness: ", self.distance(self.zh, self.fh))
print("Iteration point:", self.zh)
print("Lower boundary:", self.fh_lo)
print("==============================")
[docs] def __init__(self, method, method_class):
super().__init__(method, method_class)
[docs] def _update_fh(self):
self.fh = list(self.fh_factory.result(self.preference, self.zh_prev)[1])
[docs] def next_iteration(self, preference=None):
"""
Return next iteration bounds
"""
if preference:
self.preference = preference
print(("Given preference: %s" % self.preference.pref_input))
self._update_fh()
# tmpzh = list(self.zh)
self._update_zh(self.zh, self.fh)
# self.zh = list(np.array(self.zh) / 2. + np.array(self.zh_prev) / 2.)
# self.zh_prev = tmpzh
if self.current_iter != 1:
self.fh_lo = list(self.lower_bounds_factory.result(self.zh_prev))
self.current_iter -= 1
return self.fh_lo, self.zh
[docs]class NNAUTILUS(NAUTILUS):
"""
NAVIGATOR NAUTILUS method
Attributes
----------
fh : list of floats
Current non-dominated point
zh : list of floats
Current iteration point
fh_up : list of floats
Upper boundary for iteration points reachable from iteration point zh
fh_lo : list of floats
Lower boundary for iteration points reachable from iteration point zh
"""
[docs] class NegativeIntervalWarning(UnexpectedCondition):
[docs] def __str__(self):
return "Upper boundary is smaller than lower boundary"
[docs] def __init__(self, method, method_class):
super().__init__(method, method_class)
self.current_iter = 100
self.ref_point = None
self.fh_up = None
[docs] def _update_fh(self):
from desdeo.preference.direct import DirectSpecification
u = [1.0] * len(self.ref_point)
pref = DirectSpecification(self.problem, u, self.ref_point)
self.fh = list(self.fh_factory.result(pref, self.zh_prev)[1])
logging.debug("updated fh: %s", self.fh)
[docs] def update_points(self):
self.problem.points = reachable_points(
self.problem.points, self.fh_lo, self.fh_up
)
[docs] def next_iteration(self, ref_point, bounds=None):
"""
Calculate the next iteration point to be shown to the DM
Parameters
----------
ref_point : list of float
Reference point given by the DM
"""
if bounds:
self.problem.points = reachable_points(
self.problem.points, self.problem.ideal, bounds
)
if not utils.isin(self.fh, self.problem.points) or ref_point != self.ref_point:
self.ref_point = list(ref_point)
self._update_fh()
self._update_zh(self.zh, self.fh)
self.fh_lo = list(self.lower_bounds_factory.result(self.zh))
self.fh_up = list(self.upper_bounds_factory.result(self.zh))
logging.debug(f"Updated upper boundary: {self.fh_up}")
logging.debug(f"Uppadet lower boundary: {self.fh_lo}")
if not np.all(np.array(self.fh_up) > np.array(self.fh_lo)):
warn(self.NegativeIntervalWarning())
assert utils.isin(self.fh_up, self.problem.points)
assert utils.isin(self.fh_lo, self.problem.points)
dist = self.distance(self.zh, self.fh)
# Reachable points
self.update_points()
lP = len(self.problem.points)
self.current_iter -= 1
return dist, self.fh, self.zh, self.fh_lo, self.fh_up, lP