Source code for gpmpcontrib.computerexperiment

"""
Multi-output deterministic or stochastic computer experiments.

Author: Emmanuel Vazquez <emmanuel.vazquez@centralesupelec.fr>
Copyright (c) 2022-2026, CentraleSupelec
License: GPLv3 (see LICENSE)
"""

from __future__ import annotations

from typing import Callable, Dict, Iterable, List, Optional, Sequence, Tuple, Union

import gpmp.num as gnp
import numpy as np

ArrayLike = Union[np.ndarray, Sequence[float]]
Bounds = Optional[Tuple[float, float]]
FuncSpec = Union[
    Callable[[np.ndarray], np.ndarray],
    Dict[str, Union[Callable[[np.ndarray], np.ndarray], int, List[str], List[Bounds], List[float]]],
]


##############################################################################
#                                                                            #
#                         ComputerExperiment Class                           #
#                                                                            #
##############################################################################


[docs] class ComputerExperiment: """ Represent a multi-output computer experiment evaluating objectives and constraints. You can supply either: - a combined function (or list of functions) via `single_function` / `function_list`, whose outputs are tagged with types, or - separate objectives and constraints via `single_objective`/`objective_list` and `single_constraint`/`constraint_list`. Input domain (`input_box`) is accepted as either shape (2, d) [lower, upper] or (d, 2) [(l1, u1), ..., (ld, ud)]. Internally it is stored as (2, d). Parameters ---------- input_dim : int Dimension of the input space. input_box : array-like Either (2, d) or (d, 2). Bounds must be finite with upper > lower per dimension. single_function, function_list : FuncSpec or list[FuncSpec], optional Combined function(s). Dict entries may include: - "function": callable(X) -> (n, p_i) or (n,) - "output_dim": int (default 1) - "type": list[str] of length output_dim with values in {"objective","constraint"} - "bounds": list[Bounds] or Bounds for constraints per output. Use None for objectives. single_objective, objective_list : FuncSpec or list[FuncSpec], optional Objective function(s). Dict like above. "type" defaults to "objective". single_constraint, constraint_list : FuncSpec or list[FuncSpec], optional Constraint function(s). Dict like above. Provide bounds if not set via `constraint_bounds`. constraint_bounds : list[Bounds], optional Global bounds to attach to constraint functions provided without "bounds". Length must match total number of constraint outputs. Raises ------ ValueError If incompatible inputs are provided or shapes are inconsistent. Notes ----- - `normalize_input`: if True, `eval` assumes x ∈ [0,1]^d and scales to the physical box. You can also override per-call via `eval(..., normalize_input=True/False)`. """ def __init__( self, input_dim: int, input_box: ArrayLike, *, single_function: Optional[FuncSpec] = None, function_list: Optional[List[FuncSpec]] = None, single_objective: Optional[FuncSpec] = None, objective_list: Optional[List[FuncSpec]] = None, single_constraint: Optional[FuncSpec] = None, constraint_list: Optional[List[FuncSpec]] = None, constraint_bounds: Optional[List[Bounds]] = None, ) -> None: # Core dims and domain self.input_dim = int(input_dim) self._parse_and_set_input_box(self.input_dim, input_box) # Normalization (backing field; do NOT call property in __init__) self._normalize_input: bool = False self.input_box = self.input_box_org # current box mirrors original when not normalized # Storage and cache self.functions: List[Dict] = [] self._last_x: Optional[Tuple] = None # cache key: (use_norm: bool, tuple(rows)) self._last_result: Optional[np.ndarray] = None # Validate mutually exclusive API paths self._validate_inputs( single_function, function_list, single_objective, objective_list, single_constraint, constraint_list, ) # Register functions self._setup_functions(function_list, single_function, default_type="function") self._setup_functions(objective_list, single_objective, default_type="objective") self._setup_functions( constraint_list, single_constraint, default_type="constraint", default_bounds=constraint_bounds, ) # Finalize shape metadata self._set_output_dim() # --------------------------- helpers & validation --------------------------- def _parse_and_set_input_box(self, input_dim: int, input_box: ArrayLike) -> None: arr = np.asarray(input_box, dtype=float) if arr.shape == (2, input_dim): lower, upper = arr[0], arr[1] elif arr.shape == (input_dim, 2): lower, upper = arr[:, 0], arr[:, 1] else: raise ValueError( f"input_box must be shape (2, {input_dim}) or ({input_dim}, 2), got {arr.shape}." ) if not (np.all(np.isfinite(lower)) and np.all(np.isfinite(upper))): raise ValueError("input_box bounds must be finite.") if np.any(upper <= lower): raise ValueError("Each upper bound must be strictly greater than the lower bound.") # store as (2, d) self.input_box_org = gnp.array(np.vstack([lower, upper])) def _validate_inputs( self, single_function: Optional[FuncSpec], function_list: Optional[List[FuncSpec]], single_objective: Optional[FuncSpec], objective_list: Optional[List[FuncSpec]], single_constraint: Optional[FuncSpec], constraint_list: Optional[List[FuncSpec]], ) -> None: has_combined = (function_list is not None) or (single_function is not None) has_split = any( x is not None for x in (objective_list, single_objective, constraint_list, single_constraint) ) if has_combined and has_split: raise ValueError( "Provide either combined functions ('function_list'/'single_function') " "OR split objectives/constraints, but not both." ) def _to_list(self, item: Optional[Union[FuncSpec, List[FuncSpec]]]) -> List[FuncSpec]: if item is None: return [] return item if isinstance(item, list) else [item] def _wrap_in_dict(self, item: FuncSpec, default_type: str) -> Dict: if isinstance(item, dict): if "function" not in item: raise ValueError("Dictionary func spec must include key 'function'.") item = dict(item) # shallow copy item.setdefault("output_dim", 1) item.setdefault("type", [default_type] * int(item["output_dim"])) if len(item["type"]) != int(item["output_dim"]): raise ValueError( f"Length of 'type' ({len(item['type'])}) must match 'output_dim' ({item['output_dim']})." ) else: item = {"function": item, "output_dim": 1, "type": [default_type]} return item def _handle_constraint_bounds( self, func_dict: Dict, default_bounds: Optional[List[Bounds]], bounds_index: int, ) -> Tuple[Dict, int]: if "bounds" in func_dict: return func_dict, bounds_index if default_bounds is None: raise ValueError("Constraint function must have 'bounds' (or provide 'constraint_bounds').") d = int(func_dict["output_dim"]) if d == 1: if bounds_index >= len(default_bounds): raise ValueError("Not enough entries in 'constraint_bounds'.") b = default_bounds[bounds_index] if b is not None and (not isinstance(b, (tuple, list)) or len(b) != 2): raise ValueError("Each bounds entry must be a (lb, ub) tuple or None.") func_dict["bounds"] = b bounds_index += 1 else: if bounds_index + d > len(default_bounds): raise ValueError("Not enough entries in 'constraint_bounds' for multi-output constraint.") bs = default_bounds[bounds_index : bounds_index + d] for b in bs: if b is not None and (not isinstance(b, (tuple, list)) or len(b) != 2): raise ValueError("Each bounds entry must be a (lb, ub) tuple or None.") func_dict["bounds"] = bs bounds_index += d return func_dict, bounds_index def _setup_functions( self, func_list: Optional[List[FuncSpec]], single_func: Optional[FuncSpec], default_type: str, default_bounds: Optional[List[Bounds]] = None, ) -> None: funcs = self._to_list(func_list) + self._to_list(single_func) bounds_index = 0 for f in funcs: func_dict = self._wrap_in_dict(f, default_type) if default_type == "constraint": func_dict, bounds_index = self._handle_constraint_bounds(func_dict, default_bounds, bounds_index) self.functions.append(func_dict) def _set_output_dim(self) -> None: self.output_dim: int = int(sum(int(func["output_dim"]) for func in self.functions)) # ------------------------------ dunder & info ------------------------------ def __repr__(self) -> str: return self.__str__() def __str__(self) -> str: lines: list[str] = [] lines.append("Computer Experiment object") lines.append(f" Input Dimension : {self.input_dim}") if self._normalize_input: lines.append(f" Input Box (original): {self._format_box(self.input_box_org)}") lines.append(" Normalized Inputs : True") lines.append(f" Input Box (current) : {self._format_box(self.input_box)}") else: lines.append(f" Input Box : {self._format_box(self.input_box_org)}") lines.append(" Normalized Inputs : False") lines.append(f" Output Dimension : {self.output_dim}\n") for i, func in enumerate(self.functions, 1): lines.append(f" ** Function {i} **") lines.append(f" Type : {func['type']}") fn = func["function"] lines.append(f" Function : {fn.__name__ if callable(fn) else fn}") lines.append(f" Output Dim : {func['output_dim']}") if "bounds" in func: lines.append(f" Bounds : {func['bounds']}") if "simulated_noise_variance" in func: lines.append(f" Noise Var : {func['simulated_noise_variance']}") return "\n".join(lines) def _format_box(self, box: np.ndarray) -> str: return "[" + ", ".join(f"({lo:g}, {hi:g})" for lo, hi in zip(box[0], box[1])) + "]" # ------------------------------ configuration ------------------------------ @property def normalize_input(self) -> bool: """Whether inputs are normalized to [0,1]^d before evaluation.""" return self._normalize_input @normalize_input.setter def normalize_input(self, flag: bool) -> None: """Enable or disable input normalization. Clears evaluation cache.""" self._normalize_input = bool(flag) self.input_box = ( gnp.array([[0.0] * self.input_dim, [1.0] * self.input_dim]) if self._normalize_input else self.input_box_org ) self._last_x = None self._last_result = None # ------------------------------ metadata APIs ------------------------------
[docs] def get_output_types(self) -> List[str]: """List of output types aligned with columns of `eval` result.""" return [t for f in self.functions for t in f["type"]]
[docs] def get_output_bounds(self) -> List[Bounds]: """List of (lb, ub) or None per output, aligned with columns of `eval` result.""" out: List[Bounds] = [] for f in self.functions: d = int(f["output_dim"]) b = f.get("bounds", None) if d == 1: out.append(b if isinstance(b, (tuple, list)) else None) else: if b is None: out.extend([None] * d) else: out.extend(list(b)) return out
[docs] def get_constraint_bounds(self) -> np.ndarray: """Array of bounds (or None) for outputs whose type == 'constraint'.""" types = self.get_output_types() bounds = self.get_output_bounds() return np.asarray([b for t, b in zip(types, bounds) if t == "constraint"], dtype=object)
# --------------------------------- eval APIs -------------------------------- def __call__(self, x: ArrayLike, *, normalize_input: Optional[bool] = None) -> np.ndarray: return self.eval(x, normalize_input=normalize_input)
[docs] def eval(self, x: ArrayLike, *, normalize_input: Optional[bool] = None) -> np.ndarray: """ Evaluate all outputs at x. Parameters ---------- x : array-like Shape (n, d) or (d,). If 1D, it is reshaped to (1, d). normalize_input : Optional[bool] Per-call override. None = use object-level setting. Returns ------- ndarray Shape (n, output_dim). """ X = np.asarray(x, dtype=float) if X.ndim == 1: if X.size != self.input_dim: raise ValueError(f"Expected vector of length {self.input_dim}, got {X.size}.") X = X.reshape(1, -1) elif X.ndim == 2 and X.shape[1] == self.input_dim: pass else: raise ValueError(f"Input x must be (n, {self.input_dim}) or ({self.input_dim},), got {X.shape}.") use_norm = self.normalize_input if normalize_input is None else bool(normalize_input) # cache key includes normalization flag and the exact X values x_tuple = tuple(map(tuple, X)) cache_key = (use_norm, x_tuple) if self._last_x is not None and self._last_x == cache_key: return self._last_result # type: ignore[return-value] X_eval = X if use_norm: lower = np.asarray(self.input_box_org[0], dtype=float) upper = np.asarray(self.input_box_org[1], dtype=float) X_eval = lower + (upper - lower) * X result = self._eval_functions(self.functions, X_eval) self._last_x = cache_key self._last_result = result return result
[docs] def eval_objectives(self, x: ArrayLike, *, normalize_input: Optional[bool] = None) -> np.ndarray: z = self.eval(x, normalize_input=normalize_input) idx = [i for i, t in enumerate(self.get_output_types()) if t == "objective"] return z[:, idx] if idx else np.zeros((z.shape[0], 0))
[docs] def eval_constraints(self, x: ArrayLike, *, normalize_input: Optional[bool] = None) -> np.ndarray: z = self.eval(x, normalize_input=normalize_input) idx = [i for i, t in enumerate(self.get_output_types()) if t == "constraint"] return z[:, idx] if idx else np.zeros((z.shape[0], 0))
# -------------------------- internal function eval -------------------------- def _eval_functions(self, function_dicts: List[Dict], X: np.ndarray) -> np.ndarray: """Evaluate functions and concatenate outputs (n, p_total).""" if X.ndim != 2 or X.shape[1] != self.input_dim: raise ValueError(f"X must be shape (n, {self.input_dim}), got {X.shape}.") outputs: List[np.ndarray] = [] for func in function_dicts: f = func["function"] d = int(func["output_dim"]) z = f(X) # expected shapes: (n,) or (n, d) z = np.asarray(z, dtype=float) if z.ndim == 1: z = z[:, np.newaxis] elif z.ndim == 2 and z.shape[1] != d: raise ValueError( f"The function output dimension {z.shape[1]} does not match the expected output dimension {d}." ) elif z.ndim > 2: raise ValueError(f"Function output must be 1D or 2D, got shape {z.shape}.") outputs.append(z) return np.concatenate(outputs, axis=1) if outputs else np.empty((X.shape[0], 0), dtype=float)
############################################################################## # # # StochasticComputerExperiment Class # # # ##############################################################################
[docs] class StochasticComputerExperiment(ComputerExperiment): """ Extension of `ComputerExperiment` that simulates additive Gaussian noise on outputs. Noise configuration: - Provide a global `simulated_noise_variance` (scalar or list with length = total outputs), or - Provide per-function dictionaries with "simulated_noise_variance" (list length = that function's output_dim), or - Omit / set zeros for noise-free outputs. Methods differ from base by adding stochastic evaluation with optional batching. Notes ----- - `eval_objectives` / `eval_constraints` are not supported here because repeated calls would mix stochastic draws and caching is disabled for stochastic evals. """ def __init__( self, input_dim: int, input_box: ArrayLike, *, single_function: Optional[FuncSpec] = None, function_list: Optional[List[FuncSpec]] = None, single_objective: Optional[FuncSpec] = None, objective_list: Optional[List[FuncSpec]] = None, single_constraint: Optional[FuncSpec] = None, constraint_list: Optional[List[FuncSpec]] = None, simulated_noise_variance: Optional[Union[float, List[float]]] = None, ) -> None: super().__init__( input_dim=input_dim, input_box=input_box, single_function=single_function, function_list=function_list, single_objective=single_objective, objective_list=objective_list, single_constraint=single_constraint, constraint_list=constraint_list, ) self._initialize_noise_variance(self.functions, simulated_noise_variance) # ------------------------------- noise config ------------------------------- def _initialize_noise_variance( self, function_dicts: List[Dict], simulated_noise_variance: Optional[Union[float, List[float]]] ) -> None: # Expand scalar to per-output list if np.isscalar(simulated_noise_variance): total_dim = sum(int(f["output_dim"]) for f in function_dicts) simulated_noise_variance = [float(simulated_noise_variance)] * total_dim if simulated_noise_variance is not None: total_dim = sum(int(f["output_dim"]) for f in function_dicts) if len(simulated_noise_variance) != total_dim: raise ValueError( f"Length of 'simulated_noise_variance' ({len(simulated_noise_variance)}) " f"must equal total output_dim ({total_dim})." ) start = 0 for f in function_dicts: d = int(f["output_dim"]) f["simulated_noise_variance"] = list(np.asarray(simulated_noise_variance[start : start + d], dtype=float)) start += d else: for f in function_dicts: if "simulated_noise_variance" not in f: f["simulated_noise_variance"] = [0.0] * int(f["output_dim"]) @property def simulated_noise_variance(self) -> np.ndarray: return self.get_simulated_noise_variances()
[docs] def get_simulated_noise_variances(self) -> np.ndarray: return np.concatenate([np.asarray(func["simulated_noise_variance"], dtype=float) for func in self.functions])
# ------------------------------ dunder & info ------------------------------ def __str__(self) -> str: lines: list[str] = [] lines.append("Stochastic Computer Experiment") lines.append(f" Input Dimension : {self.input_dim}") if self._normalize_input: lines.append(f" Input Box (original): {self._format_box(self.input_box_org)}") lines.append(" Normalized Inputs : True") lines.append(f" Input Box (current) : {self._format_box(self.input_box)}") else: lines.append(f" Input Box : {self._format_box(self.input_box_org)}") lines.append(" Normalized Inputs : False") lines.append(f" Output Dimension : {self.output_dim}\n") for i, func in enumerate(self.functions, 1): lines.append(f" ** Function {i} **") lines.append(f" Type : {func['type']}") fn = func["function"] lines.append(f" Function : {fn.__name__ if callable(fn) else fn}") lines.append(f" Output Dim : {func['output_dim']}") lines.append(f" Noise Var : {func['simulated_noise_variance']}") if "bounds" in func: lines.append(f" Bounds : {func['bounds']}") return "\n".join(lines) # --------------------------------- eval APIs -------------------------------- def __call__( self, x: ArrayLike, *, simulate_noise: bool = True, batch_size: int = 1, rng: Optional[np.random.Generator] = None, ) -> np.ndarray: return self.eval(x, simulate_noise=simulate_noise, batch_size=batch_size, rng=rng)
[docs] def eval( self, x: ArrayLike, *, simulate_noise: bool = True, batch_size: int = 1, rng: Optional[np.random.Generator] = None, ) -> np.ndarray: """ Evaluate all outputs at x with optional simulated Gaussian noise. Parameters ---------- x : array-like Shape (n, d) or (d,). If 1D, it is reshaped to (1, d). simulate_noise : bool, default True Whether to add Gaussian noise using configured variances. batch_size : int, default 1 If >1, returns shape (n, p, batch_size). If 1, returns (n, p). rng : numpy.random.Generator, optional Random generator for reproducibility. If None, a default RNG is used. Returns ------- ndarray (n, p) if batch_size == 1. Otherwise (n, p, batch_size). """ if batch_size < 1: raise ValueError("batch_size must be >= 1") # Input validation & normalization (no caching for stochastic evals) X = np.asarray(x, dtype=float) if X.ndim == 1: if X.size != self.input_dim: raise ValueError(f"Expected vector of length {self.input_dim}, got {X.size}.") X = X.reshape(1, -1) elif X.ndim == 2 and X.shape[1] == self.input_dim: pass else: raise ValueError(f"Input x must be (n, {self.input_dim}) or ({self.input_dim},), got {X.shape}.") if self.normalize_input: lower = np.asarray(self.input_box_org[0], dtype=float) upper = np.asarray(self.input_box_org[1], dtype=float) X = lower + (upper - lower) * X rng = rng if rng is not None else np.random.default_rng() draws = [self._eval_functions_with_noise(self.functions, X, simulate_noise, rng) for _ in range(batch_size)] return draws[0] if batch_size == 1 else np.dstack(draws)
[docs] def eval_objectives(self, x: ArrayLike, **kwargs) -> np.ndarray: # type: ignore[override] raise NotImplementedError("eval_objectives is not supported in StochasticComputerExperiment.")
[docs] def eval_constraints(self, x: ArrayLike, **kwargs) -> np.ndarray: # type: ignore[override] raise NotImplementedError("eval_constraints is not supported in StochasticComputerExperiment.")
# -------------------------- internal function eval -------------------------- def _eval_functions_with_noise( self, function_dicts: List[Dict], X: np.ndarray, simulate_noise: bool, rng: np.random.Generator, ) -> np.ndarray: if X.ndim != 2 or X.shape[1] != self.input_dim: raise ValueError(f"X must be shape (n, {self.input_dim}), got {X.shape}.") outs: List[np.ndarray] = [] for func in function_dicts: f = func["function"] d = int(func["output_dim"]) z = f(X) z = np.asarray(z, dtype=float) if z.ndim == 1: z = z[:, np.newaxis] elif z.ndim == 2 and z.shape[1] != d: raise ValueError(f"The function output dimension {z.shape[1]} does not match the expected {d}.") elif z.ndim > 2: raise ValueError(f"Function output must be 1D or 2D, got shape {z.shape}.") if simulate_noise: var = np.asarray(func.get("simulated_noise_variance", [0.0] * d), dtype=float) if var.shape != (d,): raise ValueError(f"'simulated_noise_variance' must have length {d}.") mask = var > 0.0 if np.any(mask): std = np.sqrt(var[mask]) z[:, mask] = z[:, mask] + rng.normal(0.0, std, size=(z.shape[0], mask.sum())) outs.append(z) return np.hstack(outs) if outs else np.empty((X.shape[0], 0), dtype=float)