"""Sequential Decision-Making Strategies Module
This module implements sequential decision-making strategies.
It defines a base class, SequentialStrategy, that handles evaluations, model updates,
and sampling criterion computations. Derived classes implement specific strategies:
- SequentialStrategyGridSearch: uses a fixed candidate set for grid search.
- SequentialStrategySMC: adapts the search space using Sequential Monte Carlo (SMC).
- SequentialStrategyBSS: adapts the search space via Bayesian Subset Simulation (BSS) principles.
Authors: Emmanuel Vazquez <emmanuel.vazquez@centralesupelec.fr>
Copyright (c) 2023-2025, CentraleSupelec
License: GPLv3 (see LICENSE)
"""
import time
import pickle
import gpmp.num as gnp
from gpmp.mcmc.smc import SMC, ParticlesSetConfig, SMCConfig
from gpmpcontrib import SequentialPrediction
[docs]
class SequentialStrategy(SequentialPrediction):
"""Sequential decision-making via a sampling criterion.
This abstract class extends SequentialPrediction to incorporate a
sampling criterion for sequential decision-making. It selects new points
based on the criterion and updates the predictive model accordingly.
Parameters
----------
problem : object
Problem instance with the objective function.
model : object
Predictive model.
options : dict, optional
Strategy configuration. Options may include:
maximize_criterion : bool, default True.
update_model_at_init : bool, default True.
update_estimate_at_init : bool, default True.
update_search_space_at_init : bool, default False.
Attributes
----------
computer_experiments_problem : object
Problem instance.
options : dict
Strategy options.
nt : int
Number of candidates / search points.
xt : ndarray
Candidate points.
zpm, zpv : ndarray or None
Posterior mean and variance on candidate points.
current_estimate : None
Current best estimate.
sampling_criterion_values : array_like
Sampling criterion values.
n_iter : int
Iteration counter.
exec_times : dict
Timing metrics.
history : dict
Evaluation history with keys:
- "estimates": recorded estimates,
- "criterion_best": best criterion value per iteration,
- "model_params": snapshots of model parameters.
maximize_criterion : bool
If True, the criterion is maximized. If False, it is minimized.
"""
def __init__(self, problem, model, options=None):
super().__init__(model=model) # Ensure proper parent class initialization
self.computer_experiments_problem = problem
self.options = self.set_options(options or {})
self.xt = self.set_initial_xt()
self.nt = self.xt.shape[0] if self.xt is not None else None
self.zpm = None
self.zpv = None
self.current_estimate = None
self.sampling_criterion_values = None
self.n_iter = 0
self.exec_times = {}
self.history = {"estimates": [], "criterion_best": [], "model_params": []}
self.maximize_criterion = self.options.get("maximize_criterion", True)
def set_options(self, options):
default_options = {
"update_model_at_init": True,
"update_predictions_at_init": True,
"update_estimate_at_init": True,
"update_search_space_at_init": False,
"maximize_criterion": True,
}
default_options.update(options or {})
return default_options
def set_initial_xt(self):
raise NotImplementedError("get_initial_xt must be implemented.")
def set_initial_design(
self,
xi,
update_model=True,
update_predictions=True,
update_estimate=True,
update_search_space_at_init=False,
):
update_model = self.options.get("update_model_at_init", update_model)
update_estimate = self.options.get("update_estimate_at_init", update_estimate)
update_search_space_at_init = self.options.get(
"update_search_space_at_init", update_search_space_at_init
)
tic = time.time()
zi = self.computer_experiments_problem.eval(xi)
if update_model:
super().set_data_with_model_selection(xi, zi)
else:
super().set_data(xi, zi)
if update_predictions:
self.update_predictions() # initial predictions
if update_search_space_at_init:
self.update_search_space()
if update_predictions:
self.update_predictions() # search space may be modified, update predictions
if update_estimate:
self.update_current_estimate()
self.exec_times["initial_design"] = time.time() - tic
def update_predictions(self):
tic = time.time()
self.zpm, self.zpv = self.predict(self.xt, convert_out=False, use_cache=True)
self.exec_times["update_predictions"] = time.time() - tic
def update_current_estimate(self, *args, **kwargs):
raise NotImplementedError("update_current_estimate must be implemented.")
def update_search_space(self, *args, **kwargs):
raise NotImplementedError("update_search_space must be implemented.")
def sampling_criterion(self, *args, **kwargs):
raise NotImplementedError("sampling_criterion must be implemented.")
def update_sampling_criterion_values(self):
self.sampling_criterion_values = self.sampling_criterion()
def select_best_index(self, criterion_values):
arr = gnp.asarray(criterion_values)
return gnp.argmax(arr) if self.maximize_criterion else gnp.argmin(arr)
def make_new_eval(self, xnew, update_model=True):
tic = time.time()
znew = self.computer_experiments_problem.eval(xnew)
self.exec_times["new_eval"] = time.time() - tic
tic = time.time()
if update_model:
self.set_new_eval_with_model_selection(xnew, znew)
else:
self.set_new_eval(xnew, znew)
self.exec_times["update_model"] = time.time() - tic
[docs]
def get_model_params(self):
"""Return a snapshot of model parameters from the ModelContainer."""
params = []
for m in self.model.models:
mp = m["model"].meanparam
cp = m["model"].covparam
# Store copies to avoid later modifications
params.append(
{
"meanparam": gnp.copy(mp) if mp is not None else None,
"covparam": gnp.copy(cp) if cp is not None else None,
}
)
return params
[docs]
def get_model_state(self):
"""Return the pickleable model state."""
return self.model.get_state()
def step(self):
raise NotImplementedError("step must be implemented in the subclass.")
def save_state(self, filename="state.pkl"):
state = {
"n_iter": self.n_iter,
"history": self.history,
"exec_times": self.exec_times,
"current_estimate": self.current_estimate,
}
with open(filename, "wb") as f:
pickle.dump(state, f)
def load_state(self, filename="state.pkl"):
with open(filename, "rb") as f:
state = pickle.load(f)
self.n_iter = state.get("n_iter", 0)
self.history = state.get("history", {})
self.exec_times = state.get("exec_times", {})
self.current_estimate = state.get("current_estimate", None)
[docs]
class SequentialStrategyGridSearch(SequentialStrategy):
"""Sequential strategy using a fixed candidate set / point collection (PC).
Parameters
----------
problem : object
Problem instance.
model : object
Predictive model.
xt : array_like
Fixed candidate points.
options : dict, optional
Additional options. Defaults for PC:
update_model_at_init = True,
update_estimate_at_init = True,
update_search_space_at_init = False.
Attributes
----------
"""
def __init__(self, problem, model, xt, options=None):
if options is None:
options = {}
options.setdefault("update_model_at_init", True)
options.setdefault("update_estimate_at_init", True)
options.setdefault("update_predictions_at_init", True)
options.setdefault("update_search_space_at_init", False)
self._xt = xt # Store xt in a temporary attribute for set_initial_xt()
super().__init__(problem, model, options)
def set_initial_xt(self):
return self._xt
def step(self):
step_start = time.time()
self.update_sampling_criterion_values()
best_idx = self.select_best_index(self.sampling_criterion_values)
x_new = self.xt[best_idx].reshape(1, -1)
self.make_new_eval(x_new, update_model=True)
self.update_predictions()
self.update_current_estimate()
self.n_iter += 1
self.history.setdefault("eval_indices", []).append(int(best_idx))
self.history.setdefault("criterion_best", []).append(
gnp.to_scalar(self.sampling_criterion_values[best_idx])
)
# Save a snapshot of the model parameters.
self.history.setdefault("model_params", []).append(self.get_model_params())
self.exec_times["step"] = time.time() - step_start
# ==============================================================================
# Sequential Strategy with SMC for Search Space Adaptation
# ==============================================================================
[docs]
class SequentialStrategySMC(SequentialStrategy):
"""Sequential strategy using SMC to adapt the search space.
This class augments the base strategy with SMC functionalities. It updates
the search space using an SMC sampler.
Parameters
----------
problem : object
Problem instance.
model : object
Predictive model.
options : dict, optional
Strategy configuration. Options for SMC default to:
n_smc = 1000,
update_model_at_init = True,
update_estimate_at_init = True,
update_search_space_at_init = True.
Attributes
----------
smc : SMC
SMC instance for adapting the search space.
smc_log_density_param_initial : any
Initial parameter for SMC target density.
smc_log_density_param : any
Current parameter for SMC target density.
"""
def __init__(self, problem, model, options=None):
if options is None:
options = {}
options.setdefault("n_smc", 1000)
options.setdefault("initial_distribution_type", "randunif")
options.setdefault("update_model_at_init", True)
options.setdefault("update_predictions_at_init", True)
options.setdefault("update_estimate_at_init", True)
options.setdefault("update_search_space_at_init", True)
options.setdefault("update_method", "step_with_possible_restart")
self.computer_experiments_problem = problem
# Merge SMC-specific options with sequentialstrategy defaults.
self.options = self.set_options(options)
self.smc = self.init_smc(
self.computer_experiments_problem.input_box,
self.options["n_smc"],
self.options["initial_distribution_type"],
)
self.smc_log_density_param_initial = None
self.smc_log_density_param = None
super().__init__(problem, model, options)
[docs]
def init_smc(self, box, n_smc, initial_distribution_type):
"""Initialize the SMC instance using the problem's input box.
Notes
-----
At initialization, particles are distributed according to the initial distribution
"""
particles_config = ParticlesSetConfig(
initial_distribution_type=initial_distribution_type,
resample_scheme="residual",
)
smc_config = SMCConfig(compute_next_logpdf_param_method="p0", mh_steps=40)
return SMC(
box, n=n_smc, particles_config=particles_config, smc_config=smc_config
)
def set_initial_xt(self):
return self.smc.particles.x
def smc_log_density(self, *args, **kwargs):
raise NotImplementedError("smc_log_density must be implemented.")
def update_smc_target_log_density_param(self):
raise NotImplementedError("update_smc_log_density_param must be implemented.")
[docs]
def update_search_space(self):
"""Update the SMC search space using the specified method."""
method = self.options["update_method"]
self.update_smc_target_log_density_param()
if method == "simple_step":
self.smc.step(
logpdf_parameterized_function=self.smc_log_density,
logpdf_param=self.smc_log_density_param,
)
elif method == "restart":
self.smc.restart(
logpdf_parameterized_function=self.smc_log_density,
logpdf_initial_param=self.smc_log_density_param_initial,
logpdf_final_param=self.smc_log_density_param,
p0=0.8,
debug=True,
)
elif method == "step_with_possible_restart":
self.smc.step_with_possible_restart(
logpdf_parameterized_function=self.smc_log_density,
initial_logpdf_param=self.smc_log_density_param_initial,
target_logpdf_param=self.smc_log_density_param,
min_ess_ratio=0.6,
p0=0.6,
debug=False,
)
self.xt = self.smc.particles.x
def step(self):
step_start = time.time()
# Evaluate the sampling criterion on SMC particles.
self.update_sampling_criterion_values()
best_idx = self.select_best_index(self.sampling_criterion_values)
x_new = self.xt[best_idx].reshape(1, -1)
self.make_new_eval(x_new, update_model=True)
# Update current predictions & estimate for SMC
self.update_predictions()
self.update_current_estimate()
# Update the SMC search space.
self.update_search_space()
# Update predictions & current estimate at the end of the step
self.update_predictions()
self.update_current_estimate()
self.n_iter += 1
self.history.setdefault("eval_indices", []).append(int(best_idx))
self.history.setdefault("criterion_best", []).append(
gnp.to_scalar(self.sampling_criterion_values[best_idx])
)
self.history.setdefault("model_params", []).append(self.get_model_params())
self.exec_times["step"] = time.time() - step_start
# ==============================================================================
# Sequential Strategy with BSS for Search Space Adaptation
# ==============================================================================
[docs]
class SequentialStrategyBSS(SequentialStrategy):
"""Sequential strategy using Bayesian Subset Simulation (BSS) for search space adaptation.
This extends `SequentialStrategy` by incorporating a Sequential
Monte Carlo (SMC) sampler to refine the search space based on
Bayesian Subset Simulation principles.
Parameters
----------
problem : object
The optimization problem.
model : object
Predictive model (e.g., Gaussian process).
options : dict, optional
Strategy configuration:
- n_smc (int, default 1000): Number of SMC particles.
- initial_distribution_type (str, default "randunif"): Initial particle distribution.
- update_model_at_init (bool, default True): Update model at initialization.
- update_predictions_at_init (bool, default True): Update predictions at initialization.
- update_estimate_at_init (bool, default True): Update estimate at initialization.
- update_search_space_at_init (bool, default True): Update search space at initialization.
Attributes
----------
smc : SMC
SMC sampler used for search space adaptation.
smc_log_density_param : any
Current parameter for the SMC target density.
"""
def __init__(self, problem, model, options=None):
if options is None:
options = {}
options.setdefault("n_smc", 1000)
options.setdefault("initial_distribution_type", "randunif")
options.setdefault("update_model_at_init", True)
options.setdefault("update_predictions_at_init", True)
options.setdefault("update_estimate_at_init", True)
options.setdefault("update_search_space_at_init", True)
self.computer_experiments_problem = problem
# Merge SMC-specific options with sequentialstrategy defaults.
self.options = self.set_options(options)
self.smc = self.init_smc(
self.computer_experiments_problem.input_box,
self.options["n_smc"],
self.options["initial_distribution_type"],
)
self.smc_log_density_param_initial = None
self.smc_log_density_param = None
super().__init__(problem, model, options)
[docs]
def init_smc(self, box, n_smc, initial_distribution_type):
"""Initialize the SMC instance using the problem's input box.
Notes
-----
At initialization, particles are distributed according to the initial distribution
"""
particles_config = ParticlesSetConfig(
initial_distribution_type=initial_distribution_type,
resample_scheme="residual",
)
smc_config = SMCConfig(compute_next_logpdf_param_method="p0", mh_steps=40)
return SMC(
box, n=n_smc, particles_config=particles_config, smc_config=smc_config
)
def set_initial_xt(self):
return self.smc.particles.x
def smc_log_density(self, *args, **kwargs):
raise NotImplementedError("smc_log_density must be implemented.")
def set_smc_target_log_density_param(self):
raise NotImplementedError("set_smc_log_density_param must be implemented.")
def set_smc_current_log_density_param(self):
raise NotImplementedError("set_smc_log_density_param must be implemented.")
[docs]
def update_search_space(self):
"""Update the SMC search space using the specified method."""
self.smc.step(
logpdf_parameterized_function=self.smc_log_density,
logpdf_param=self.smc_log_density_param,
)
self.xt = self.smc.particles.x
def step_new_evaluation(self):
step_start = time.time()
# Evaluate the sampling criterion on SMC particles.
self.update_sampling_criterion_values()
best_idx = self.select_best_index(self.sampling_criterion_values)
x_new = self.xt[best_idx].reshape(1, -1)
self.make_new_eval(x_new, update_model=True)
# Update current predictions & estimate for SMC
self.update_predictions()
self.update_current_estimate()
self.n_iter += 1
self.history.setdefault("eval_indices", []).append(int(best_idx))
self.history.setdefault("criterion_best", []).append(
gnp.to_scalar(self.sampling_criterion_values[best_idx])
)
self.history.setdefault("model_params", []).append(self.get_model_params())
self.exec_times["step"] = time.time() - step_start
def step_move_particles(self):
step_start = time.time()
# Update the SMC search space.
self.update_search_space()
# Update predictions & current estimate at the end of the step
self.update_predictions()
self.update_current_estimate()
self.exec_times["step"] = time.time() - step_start
def restart(self, update_search_space=True, update_estimate=True):
self.smc = self.init_smc(
self.computer_experiments_problem.input_box,
self.options["n_smc"],
self.options["initial_distribution_type"],
)
# Update the SMC search space.
if update_search_space:
self.update_search_space()
if update_estimate:
self.update_current_estimate()