Source code for optuna.samplers._cmaes

from __future__ import annotations

from collections.abc import Sequence
import copy
import math
import pickle
from typing import Any
from typing import cast
from typing import TYPE_CHECKING
from typing import Union
import warnings

import numpy as np

import optuna
from optuna import _deprecated
from optuna import logging
from optuna._experimental import warn_experimental_argument
from optuna._imports import _LazyImport
from optuna._transform import _SearchSpaceTransform
from optuna.distributions import BaseDistribution
from optuna.distributions import FloatDistribution
from optuna.distributions import IntDistribution
from optuna.samplers import BaseSampler
from optuna.samplers._lazy_random_state import LazyRandomState
from optuna.search_space import IntersectionSearchSpace
from optuna.study._study_direction import StudyDirection
from optuna.trial import FrozenTrial
from optuna.trial import TrialState


if TYPE_CHECKING:
    import cmaes

    CmaClass = Union[cmaes.CMA, cmaes.SepCMA, cmaes.CMAwM]
else:
    cmaes = _LazyImport("cmaes")

_logger = logging.get_logger(__name__)

_EPS = 1e-10
# The value of system_attrs must be less than 2046 characters on RDBStorage.
_SYSTEM_ATTR_MAX_LENGTH = 2045


[docs] class CmaEsSampler(BaseSampler): """A sampler using `cmaes <https://github.com/CyberAgentAILab/cmaes>`__ as the backend. Example: Optimize a simple quadratic function by using :class:`~optuna.samplers.CmaEsSampler`. .. code-block:: console $ pip install cmaes .. testcode:: import optuna def objective(trial): x = trial.suggest_float("x", -1, 1) y = trial.suggest_int("y", -1, 1) return x**2 + y sampler = optuna.samplers.CmaEsSampler() study = optuna.create_study(sampler=sampler) study.optimize(objective, n_trials=20) Please note that this sampler does not support CategoricalDistribution. However, :class:`~optuna.distributions.FloatDistribution` with ``step``, (:func:`~optuna.trial.Trial.suggest_float`) and :class:`~optuna.distributions.IntDistribution` (:func:`~optuna.trial.Trial.suggest_int`) are supported. If your search space contains categorical parameters, I recommend you to use :class:`~optuna.samplers.TPESampler` instead. Furthermore, there is room for performance improvements in parallel optimization settings. This sampler cannot use some trials for updating the parameters of multivariate normal distribution. For further information about CMA-ES algorithm, please refer to the following papers: - `N. Hansen, The CMA Evolution Strategy: A Tutorial. arXiv:1604.00772, 2016. <https://arxiv.org/abs/1604.00772>`__ - `A. Auger and N. Hansen. A restart CMA evolution strategy with increasing population size. In Proceedings of the IEEE Congress on Evolutionary Computation (CEC 2005), pages 1769–1776. IEEE Press, 2005. <https://doi.org/10.1109/CEC.2005.1554902>`__ - `N. Hansen. Benchmarking a BI-Population CMA-ES on the BBOB-2009 Function Testbed. GECCO Workshop, 2009. <https://doi.org/10.1145/1570256.1570333>`__ - `Raymond Ros, Nikolaus Hansen. A Simple Modification in CMA-ES Achieving Linear Time and Space Complexity. 10th International Conference on Parallel Problem Solving From Nature, Sep 2008, Dortmund, Germany. inria-00287367. <https://doi.org/10.1007/978-3-540-87700-4_30>`__ - `Masahiro Nomura, Shuhei Watanabe, Youhei Akimoto, Yoshihiko Ozaki, Masaki Onishi. Warm Starting CMA-ES for Hyperparameter Optimization, AAAI. 2021. <https://doi.org/10.1609/aaai.v35i10.17109>`__ - `R. Hamano, S. Saito, M. Nomura, S. Shirakawa. CMA-ES with Margin: Lower-Bounding Marginal Probability for Mixed-Integer Black-Box Optimization, GECCO. 2022. <https://doi.org/10.1145/3512290.3528827>`__ - `M. Nomura, Y. Akimoto, I. Ono. CMA-ES with Learning Rate Adaptation: Can CMA-ES with Default Population Size Solve Multimodal and Noisy Problems?, GECCO. 2023. <https://doi.org/10.1145/3583131.3590358>`__ .. seealso:: You can also use `optuna_integration.PyCmaSampler <https://optuna-integration.readthedocs.io/en/stable/reference/generated/optuna_integration.PyCmaSampler.html#optuna_integration.PyCmaSampler>`__ which is a sampler using cma library as the backend. Args: x0: A dictionary of an initial parameter values for CMA-ES. By default, the mean of ``low`` and ``high`` for each distribution is used. Note that ``x0`` is sampled uniformly within the search space domain for each restart if you specify ``restart_strategy`` argument. sigma0: Initial standard deviation of CMA-ES. By default, ``sigma0`` is set to ``min_range / 6``, where ``min_range`` denotes the minimum range of the distributions in the search space. seed: A random seed for CMA-ES. n_startup_trials: The independent sampling is used instead of the CMA-ES algorithm until the given number of trials finish in the same study. independent_sampler: A :class:`~optuna.samplers.BaseSampler` instance that is used for independent sampling. The parameters not contained in the relative search space are sampled by this sampler. The search space for :class:`~optuna.samplers.CmaEsSampler` is determined by :func:`~optuna.search_space.intersection_search_space()`. If :obj:`None` is specified, :class:`~optuna.samplers.RandomSampler` is used as the default. .. seealso:: :class:`optuna.samplers` module provides built-in independent samplers such as :class:`~optuna.samplers.RandomSampler` and :class:`~optuna.samplers.TPESampler`. warn_independent_sampling: If this is :obj:`True`, a warning message is emitted when the value of a parameter is sampled by using an independent sampler. Note that the parameters of the first trial in a study are always sampled via an independent sampler, so no warning messages are emitted in this case. restart_strategy: Strategy for restarting CMA-ES optimization when converges to a local minimum. If :obj:`None` is given, CMA-ES will not restart (default). If 'ipop' is given, CMA-ES will restart with increasing population size. if 'bipop' is given, CMA-ES will restart with the population size increased or decreased. Please see also ``inc_popsize`` parameter. .. warning:: Deprecated in v4.4.0. ``restart_strategy`` argument will be removed in the future. The removal of this feature is currently scheduled for v6.0.0, but this schedule is subject to change. From v4.4.0 onward, ``restart_strategy`` automatically falls back to ``None``, and ``restart_strategy`` will be supported in OptunaHub. See https://github.com/optuna/optuna/releases/tag/v4.4.0. popsize: A population size of CMA-ES. inc_popsize: Multiplier for increasing population size before each restart. This argument will be used when ``restart_strategy = 'ipop'`` or ``restart_strategy = 'bipop'`` is specified. .. warning:: Deprecated in v4.4.0. ``inc_popsize`` argument will be removed in the future. The removal of this feature is currently scheduled for v6.0.0, but this schedule is subject to change. From v4.4.0 onward, ``inc_popsize`` is no longer utilized within Optuna, and ``inc_popsize`` will be supported in OptunaHub. See https://github.com/optuna/optuna/releases/tag/v4.4.0. consider_pruned_trials: If this is :obj:`True`, the PRUNED trials are considered for sampling. .. note:: Added in v2.0.0 as an experimental feature. The interface may change in newer versions without prior notice. See https://github.com/optuna/optuna/releases/tag/v2.0.0. .. note:: It is suggested to set this flag :obj:`False` when the :class:`~optuna.pruners.MedianPruner` is used. On the other hand, it is suggested to set this flag :obj:`True` when the :class:`~optuna.pruners.HyperbandPruner` is used. Please see `the benchmark result <https://github.com/optuna/optuna/pull/1229>`__ for the details. use_separable_cma: If this is :obj:`True`, the covariance matrix is constrained to be diagonal. Due to reduce the model complexity, the learning rate for the covariance matrix is increased. Consequently, this algorithm outperforms CMA-ES on separable functions. .. note:: Added in v2.6.0 as an experimental feature. The interface may change in newer versions without prior notice. See https://github.com/optuna/optuna/releases/tag/v2.6.0. with_margin: If this is :obj:`True`, CMA-ES with margin is used. This algorithm prevents samples in each discrete distribution (:class:`~optuna.distributions.FloatDistribution` with ``step`` and :class:`~optuna.distributions.IntDistribution`) from being fixed to a single point. Currently, this option cannot be used with ``use_separable_cma=True``. .. note:: Added in v3.1.0 as an experimental feature. The interface may change in newer versions without prior notice. See https://github.com/optuna/optuna/releases/tag/v3.1.0. lr_adapt: If this is :obj:`True`, CMA-ES with learning rate adaptation is used. This algorithm focuses on working well on multimodal and/or noisy problems with default settings. Currently, this option cannot be used with ``use_separable_cma=True`` or ``with_margin=True``. .. note:: Added in v3.3.0 or later, as an experimental feature. The interface may change in newer versions without prior notice. See https://github.com/optuna/optuna/releases/tag/v3.3.0. source_trials: This option is for Warm Starting CMA-ES, a method to transfer prior knowledge on similar HPO tasks through the initialization of CMA-ES. This method estimates a promising distribution from ``source_trials`` and generates the parameter of multivariate gaussian distribution. Please note that it is prohibited to use ``x0``, ``sigma0``, or ``use_separable_cma`` argument together. .. note:: Added in v2.6.0 as an experimental feature. The interface may change in newer versions without prior notice. See https://github.com/optuna/optuna/releases/tag/v2.6.0. """ # NOQA: E501 def __init__( self, x0: dict[str, Any] | None = None, sigma0: float | None = None, n_startup_trials: int = 1, independent_sampler: BaseSampler | None = None, warn_independent_sampling: bool = True, seed: int | None = None, *, consider_pruned_trials: bool = False, restart_strategy: str | None = None, popsize: int | None = None, inc_popsize: int = -1, use_separable_cma: bool = False, with_margin: bool = False, lr_adapt: bool = False, source_trials: list[FrozenTrial] | None = None, ) -> None: if restart_strategy is not None or inc_popsize != -1: msg = _deprecated._DEPRECATION_WARNING_TEMPLATE.format( name="`restart_strategy`", d_ver="4.4.0", r_ver="6.0.0" ) warnings.warn( f"{msg} From v4.4.0 onward, `restart_strategy` automatically falls back to " "`None`. `restart_strategy` will be supported in OptunaHub.", FutureWarning, ) self._x0 = x0 self._sigma0 = sigma0 self._independent_sampler = independent_sampler or optuna.samplers.RandomSampler(seed=seed) self._n_startup_trials = n_startup_trials self._warn_independent_sampling = warn_independent_sampling self._cma_rng = LazyRandomState(seed) self._search_space = IntersectionSearchSpace() self._consider_pruned_trials = consider_pruned_trials self._popsize = popsize self._use_separable_cma = use_separable_cma self._with_margin = with_margin self._lr_adapt = lr_adapt self._source_trials = source_trials if self._use_separable_cma: self._attr_prefix = "sepcma:" elif self._with_margin: self._attr_prefix = "cmawm:" else: self._attr_prefix = "cma:" if self._consider_pruned_trials: warn_experimental_argument("consider_pruned_trials") if self._use_separable_cma: warn_experimental_argument("use_separable_cma") if self._source_trials is not None: warn_experimental_argument("source_trials") if self._with_margin: warn_experimental_argument("with_margin") if self._lr_adapt: warn_experimental_argument("lr_adapt") if source_trials is not None and (x0 is not None or sigma0 is not None): raise ValueError( "It is prohibited to pass `source_trials` argument when " "x0 or sigma0 is specified." ) # TODO(c-bata): Support WS-sep-CMA-ES. if source_trials is not None and use_separable_cma: raise ValueError( "It is prohibited to pass `source_trials` argument when using separable CMA-ES." ) if lr_adapt and (use_separable_cma or with_margin): raise ValueError( "It is prohibited to pass `use_separable_cma` or `with_margin` argument when " "using `lr_adapt`." ) # TODO(knshnb): Support sep-CMA-ES with margin. if self._use_separable_cma and self._with_margin: raise ValueError( "Currently, we do not support `use_separable_cma=True` and `with_margin=True`." )
[docs] def reseed_rng(self) -> None: # _cma_rng doesn't require reseeding because the relative sampling reseeds in each trial. self._independent_sampler.reseed_rng()
[docs] def infer_relative_search_space( self, study: "optuna.Study", trial: "optuna.trial.FrozenTrial" ) -> dict[str, BaseDistribution]: search_space: dict[str, BaseDistribution] = {} for name, distribution in self._search_space.calculate(study).items(): if distribution.single(): # `cma` cannot handle distributions that contain just a single value, so we skip # them. Note that the parameter values for such distributions are sampled in # `Trial`. continue if not isinstance(distribution, (FloatDistribution, IntDistribution)): # Categorical distribution is unsupported. continue search_space[name] = distribution return search_space
[docs] def sample_relative( self, study: "optuna.Study", trial: "optuna.trial.FrozenTrial", search_space: dict[str, BaseDistribution], ) -> dict[str, Any]: self._raise_error_if_multi_objective(study) if len(search_space) == 0: return {} completed_trials = self._get_trials(study) if len(completed_trials) < self._n_startup_trials: return {} if len(search_space) == 1: if self._warn_independent_sampling: _logger.warning( "`CmaEsSampler` only supports two or more dimensional continuous " "search space. `{}` is used instead of `CmaEsSampler`.".format( self._independent_sampler.__class__.__name__ ) ) self._warn_independent_sampling = False return {} # When `with_margin=True`, bounds in discrete dimensions are handled inside `CMAwM`. trans = _SearchSpaceTransform( search_space, transform_step=not self._with_margin, transform_0_1=True ) optimizer = self._restore_optimizer(completed_trials) if optimizer is None: optimizer = self._init_optimizer(trans, study.direction) if optimizer.dim != len(trans.bounds): if self._warn_independent_sampling: _logger.warning( "`CmaEsSampler` does not support dynamic search space. " "`{}` is used instead of `CmaEsSampler`.".format( self._independent_sampler.__class__.__name__ ) ) self._warn_independent_sampling = False return {} # TODO(c-bata): Reduce the number of wasted trials during parallel optimization. # See https://github.com/optuna/optuna/pull/920#discussion_r385114002 for details. solution_trials = self._get_solution_trials(completed_trials, optimizer.generation) if len(solution_trials) >= optimizer.population_size: solutions: list[tuple[np.ndarray, float]] = [] for t in solution_trials[: optimizer.population_size]: assert t.value is not None, "completed trials must have a value" if isinstance(optimizer, cmaes.CMAwM): x = np.array(t.system_attrs["x_for_tell"]) else: x = trans.transform(t.params) y = t.value if study.direction == StudyDirection.MINIMIZE else -t.value solutions.append((x, y)) optimizer.tell(solutions) # Store optimizer. optimizer_str = pickle.dumps(optimizer).hex() optimizer_attrs = self._split_optimizer_str(optimizer_str) for key in optimizer_attrs: study._storage.set_trial_system_attr(trial._trial_id, key, optimizer_attrs[key]) # Caution: optimizer should update its seed value. seed = self._cma_rng.rng.randint(1, 2**16) + trial.number optimizer._rng.seed(seed) if isinstance(optimizer, cmaes.CMAwM): params, x_for_tell = optimizer.ask() study._storage.set_trial_system_attr( trial._trial_id, "x_for_tell", x_for_tell.tolist() ) else: params = optimizer.ask() generation_attr_key = self._attr_key_generation study._storage.set_trial_system_attr( trial._trial_id, generation_attr_key, optimizer.generation ) external_values = trans.untransform(params) return external_values
@property def _attr_key_generation(self) -> str: return self._attr_prefix + "generation" @property def _attr_key_optimizer(self) -> str: return self._attr_prefix + "optimizer" def _concat_optimizer_attrs(self, optimizer_attrs: dict[str, str]) -> str: return "".join( optimizer_attrs["{}:{}".format(self._attr_key_optimizer, i)] for i in range(len(optimizer_attrs)) ) def _split_optimizer_str(self, optimizer_str: str) -> dict[str, str]: optimizer_len = len(optimizer_str) attrs = {} for i in range(math.ceil(optimizer_len / _SYSTEM_ATTR_MAX_LENGTH)): start = i * _SYSTEM_ATTR_MAX_LENGTH end = min((i + 1) * _SYSTEM_ATTR_MAX_LENGTH, optimizer_len) attrs["{}:{}".format(self._attr_key_optimizer, i)] = optimizer_str[start:end] return attrs def _restore_optimizer( self, completed_trials: "list[optuna.trial.FrozenTrial]", ) -> "CmaClass" | None: # Restore a previous CMA object. for trial in reversed(completed_trials): optimizer_attrs = { key: value for key, value in trial.system_attrs.items() if key.startswith(self._attr_key_optimizer) } if len(optimizer_attrs) == 0: continue optimizer_str = self._concat_optimizer_attrs(optimizer_attrs) return pickle.loads(bytes.fromhex(optimizer_str)) return None def _init_optimizer( self, trans: _SearchSpaceTransform, direction: StudyDirection, ) -> "CmaClass": lower_bounds = trans.bounds[:, 0] upper_bounds = trans.bounds[:, 1] n_dimension = len(trans.bounds) if self._source_trials is None: if self._x0 is None: mean = lower_bounds + (upper_bounds - lower_bounds) / 2 else: # `self._x0` is external representations. mean = trans.transform(self._x0) if self._sigma0 is None: sigma0 = np.min((upper_bounds - lower_bounds) / 6) else: sigma0 = self._sigma0 cov = None else: expected_states = [TrialState.COMPLETE] if self._consider_pruned_trials: expected_states.append(TrialState.PRUNED) # TODO(c-bata): Filter parameters by their values instead of checking search space. sign = 1 if direction == StudyDirection.MINIMIZE else -1 source_solutions = [ (trans.transform(t.params), sign * cast(float, t.value)) for t in self._source_trials if t.state in expected_states and _is_compatible_search_space(trans, t.distributions) ] if len(source_solutions) == 0: raise ValueError("No compatible source_trials") # TODO(c-bata): Add options to change prior parameters (alpha and gamma). mean, sigma0, cov = cmaes.get_warm_start_mgd(source_solutions) # Avoid ZeroDivisionError in cmaes. sigma0 = max(sigma0, _EPS) if self._use_separable_cma: return cmaes.SepCMA( mean=mean, sigma=sigma0, bounds=trans.bounds, seed=self._cma_rng.rng.randint(1, 2**31 - 2), n_max_resampling=10 * n_dimension, population_size=self._popsize, ) if self._with_margin: steps = np.empty(len(trans._search_space), dtype=float) for i, dist in enumerate(trans._search_space.values()): assert isinstance(dist, (IntDistribution, FloatDistribution)) # Set step 0.0 for continuous search space. if dist.step is None or dist.log: steps[i] = 0.0 elif dist.low == dist.high: steps[i] = 1.0 else: steps[i] = dist.step / (dist.high - dist.low) return cmaes.CMAwM( mean=mean, sigma=sigma0, bounds=trans.bounds, steps=steps, cov=cov, seed=self._cma_rng.rng.randint(1, 2**31 - 2), n_max_resampling=10 * n_dimension, population_size=self._popsize, ) return cmaes.CMA( mean=mean, sigma=sigma0, cov=cov, bounds=trans.bounds, seed=self._cma_rng.rng.randint(1, 2**31 - 2), n_max_resampling=10 * n_dimension, population_size=self._popsize, lr_adapt=self._lr_adapt, )
[docs] def sample_independent( self, study: "optuna.Study", trial: "optuna.trial.FrozenTrial", param_name: str, param_distribution: BaseDistribution, ) -> Any: self._raise_error_if_multi_objective(study) if self._warn_independent_sampling: complete_trials = self._get_trials(study) if len(complete_trials) >= self._n_startup_trials: self._log_independent_sampling(trial, param_name) return self._independent_sampler.sample_independent( study, trial, param_name, param_distribution )
def _log_independent_sampling(self, trial: FrozenTrial, param_name: str) -> None: _logger.warning( "The parameter '{}' in trial#{} is sampled independently " "by using `{}` instead of `CmaEsSampler` " "(optimization performance may be degraded). " "`CmaEsSampler` does not support dynamic search space or `CategoricalDistribution`. " "You can suppress this warning by setting `warn_independent_sampling` " "to `False` in the constructor of `CmaEsSampler`, " "if this independent sampling is intended behavior.".format( param_name, trial.number, self._independent_sampler.__class__.__name__ ) ) def _get_trials(self, study: "optuna.Study") -> list[FrozenTrial]: complete_trials = [] for t in study._get_trials(deepcopy=False, use_cache=True): if t.state == TrialState.COMPLETE: complete_trials.append(t) elif ( t.state == TrialState.PRUNED and len(t.intermediate_values) > 0 and self._consider_pruned_trials ): _, value = max(t.intermediate_values.items()) if value is None: continue # We rewrite the value of the trial `t` for sampling, so we need a deepcopy. copied_t = copy.deepcopy(t) copied_t.value = value complete_trials.append(copied_t) return complete_trials def _get_solution_trials( self, trials: list[FrozenTrial], generation: int ) -> list[FrozenTrial]: generation_attr_key = self._attr_key_generation return [t for t in trials if generation == t.system_attrs.get(generation_attr_key, -1)]
[docs] def before_trial(self, study: optuna.Study, trial: FrozenTrial) -> None: self._independent_sampler.before_trial(study, trial)
[docs] def after_trial( self, study: "optuna.Study", trial: "optuna.trial.FrozenTrial", state: TrialState, values: Sequence[float] | None, ) -> None: self._independent_sampler.after_trial(study, trial, state, values)
def _is_compatible_search_space( trans: _SearchSpaceTransform, search_space: dict[str, BaseDistribution] ) -> bool: intersection_size = len(set(trans._search_space.keys()).intersection(search_space.keys())) return intersection_size == len(trans._search_space) == len(search_space)