"""
Gaussian Process Model Container
This module defines a `ModelContainer` object as a wrapper around the
`Model` object in `gpmp`, simplifying the creation and management of
Gaussian Process models. It provides tools for choosing the mean and
covariance functions, and accommodates multiple outputs. The module
supports parameter selection using user-provided methods (ML, REML...).
It is used by `SequentialPrediction` and `SequentialStrategy`.
Author: Emmanuel Vazquez <emmanuel.vazquez@centralesupelec.fr>
Copyright: 2022-2026, CentraleSupelec
License: GPLv3 (refer to LICENSE file)
"""
import time
from dataclasses import dataclass
import gpmp.num as gnp
import gpmp as gp
from gpmp.parameter import Param
[docs]
class AttrDict(dict):
"""Dict with attribute access to keys (d.k <-> d['k']).
Missing keys raise AttributeError.
"""
def __getattr__(self, key):
try:
return self[key]
except KeyError:
raise AttributeError(f"'AttrDict' object has no attribute '{key}'")
def __setattr__(self, key, value):
self[key] = value
def __delattr__(self, key):
try:
del self[key]
except KeyError:
raise AttributeError(f"'AttrDict' object has no attribute '{key}'")
def __repr__(self):
# Return a representation that lists only the keys.
return f"AttrDict({list(self.keys())})"
[docs]
@dataclass
class SelectionCriterionBuildContext:
"""Context passed when building a per-output selection criterion."""
model: object
xi: object = None
zi: object = None
def _unbuilt_selection_criterion(*args, **kwargs):
raise RuntimeError(
"Selection criterion is not built yet. Run select_params(xi, zi) first."
)
_unbuilt_selection_criterion._gpmp_unbuilt = True
# ==============================================================================
# ModelContainer
# ==============================================================================
class ModelContainer:
"""
Container for one or more `gpmp.core.Model` instances (multi-output GP).
This class streamlines the creation and management of Gaussian Process
models with possibly different mean/covariance choices per output. It also
wires in parameter selection (e.g., ML/REML), prediction/LOO utilities,
optional conditional simulations, light-weight diagnostics wrappers, and
basic serialization.
Parameters
----------
name : str
Model name (free text).
output_dim : int
Number of outputs (i.e., number of independent GP models managed).
parameterized_mean : bool
If True, the GP uses a *parameterized* mean (you provide the number of
mean parameters via `param_length`). If False, it uses a *linear
predictor* mean (design matrix) and the mean parameter vector is
empty/implicit.
mean_specification : dict | list[dict]
Mean specification(s). Either a single dict applied to all outputs, or
a list of dicts (one per output). Each dict may be either
- {"function": callable, "param_length": int} to use a provided mean
function; or
- free-form arguments consumed by your subclass implementation of
`build_mean_function(output_idx, param)` which must return
`(mean_callable, param_length)`.
covariance_specification : dict | list[dict]
Covariance specification(s). Same broadcasting rules as `mean_specification`.
Each dict may be either
- {"function": callable} to use a provided covariance function; or
- free-form arguments consumed by your subclass implementation of
`build_covariance(output_idx, param)` which must return a callable.
initial_guess_procedures : callable | list[callable] | None
Initial-guess procedure(s) for parameter optimization. Either a single
callable applied to all outputs or a list (one per output). If None,
they are built via `build_parameters_initial_guess_procedure(...)`.
A procedure must accept `(model, xi, zi)` and return either
`(meanparam0, covparam0)` (parameterized mean) or `covparam0`.
selection_criteria : callable | list[callable] | None
Selection criterion/criteria used during parameter estimation. Either a
single callable applied to all outputs or a list (one per output). If
None, they are built via `build_selection_criterion(...)` when needed
(lazy build, typically in `select_params(...)`). A criterion
must match the expected model signature:
- linear-predictor mean: `criterion(model, covparam, xi, zi)`
- parameterized mean: `criterion(model, meanparam, covparam, xi, zi)`
Attributes
----------
name : str
Model name.
output_dim : int
Number of outputs.
parameterized_mean : bool
Whether a parameterized mean is used.
models : list[AttrDict]
For each output, an `AttrDict` with keys:
- "output_name" : str
- "model" : gpmp.core.Model
- "mean_fname" : str
- "mean_paramlength" : int
- "covariance_fname" : str
- "param" : gpmp.parameter.Param | None
- "param_from_vectors" : callable
- "param_to_vectors" : callable
- "get_param" : callable
- "apply_param" : callable
- "parameters_initial_guess_procedure" : callable | None
- "selection_criterion" : callable
- "info" : object | None (optimizer/selection report; filled by `select_params`)
Because each entry is an ``AttrDict``, both dict-style and attribute-style
access are supported, e.g. ``self.models[k]["get_param"]`` and
``self.models[k].get_param()`` are equivalent.
Key Methods (overview)
----------------------
Construction / config
make_mean_functions, make_covariance_functions
build_mean_function (override in subclass)
build_covariance (override in subclass)
Parameter selection
make_parameters_initial_guess_procedures
make_selection_criteria
make_param_object_procedures
select_params
Inference
predict : joint predictions over all outputs
loo : leave-one-out over all outputs
compute_conditional_simulations
Diagnostics (wrappers)
run_diagnosis : per-output wrapper of `gp.modeldiagnosis.diag`
run_perf : per-output wrapper of `gp.modeldiagnosis.perf`
plot_selection_criterion_crosssections
plot_selection_criterion_profile_2d
selection_criterion_stats_fast
selection_criterion_stats
Sampling
sample_parameters
Serialization
get_state, set_state, save_state, load_state
Notes
-----
* For multi-output problems, all arrays passed to `predict`, `loo`, and
wrappers should have shape `(n, output_dim)` for targets (unless you pick
a specific output via the wrapper’s `output_ind`/`model_index`).
* Subclasses must implement `build_mean_function` and `build_covariance`
(and usually the two `build_*` selection helpers) when you don’t pass
explicit callables in `*_params`.
"""
# --------------------------------------------------------------------------
# Construction & representation
# --------------------------------------------------------------------------
def __init__(
self,
name,
output_dim,
parameterized_mean,
mean_specification,
covariance_specification,
initial_guess_procedures=None,
selection_criteria=None,
):
"""Create a multi-output GP container with per-output mean/covariance."""
self.name = name
self.output_dim = output_dim
self.rebuild_selection_criterion_on_select_params = False
self.selection_criterion_build_policy = "static"
self._selection_criteria_spec = selection_criteria
self.parameterized_mean = parameterized_mean
mean_type = "parameterized" if self.parameterized_mean else "linear_predictor"
# mean_functions : list[callable] -> one mean function per
# mean_functions_info : list[dict] -> per-output dicts
# with keys `"description"` and `"param_length"`
# covariance_functions : list[callable] -> one covariance function per output.
mean_functions, mean_functions_info = self.make_mean_functions(
mean_specification
)
covariance_functions = self.make_covariance_functions(covariance_specification)
# Build per-output models
self.models = []
for i in range(output_dim):
model = gp.core.Model(
mean_functions[i],
covariance_functions[i],
meanparam=None,
covparam=None,
meantype=mean_type,
)
self.models.append(
AttrDict(
{
"output_name": f"output{i}",
"model": model,
"mean_fname": mean_functions_info[i]["description"],
"mean_paramlength": mean_functions_info[i]["param_length"],
"covariance_fname": getattr(
model.covariance,
"__name__",
type(model.covariance).__name__,
),
"param": None, # a Param instance from gpmp.parameter
"parameters_initial_guess_procedure": None,
"selection_criterion": _unbuilt_selection_criterion,
"info": None,
}
)
)
# Install initial-guess procedures.
# Selection criteria are built lazily in select_params(...), where xi/zi are available.
parameters_initial_guess_procedures = (
self.make_parameters_initial_guess_procedures(initial_guess_procedures)
)
for i in range(self.output_dim):
self.models[i]["parameters_initial_guess_procedure"] = (
parameters_initial_guess_procedures[i]
)
# Install get_param / apply_param on each models[i]
param_procedures = self.make_param_object_procedures(
param_procedures=None,
auxiliary_params=[
{"name_prefix": f"z{i}_"} for i in range(self.output_dim)
],
)
for i, (pf, pt) in enumerate(param_procedures):
model_entry = self.models[i]
model_entry["param_from_vectors"] = pf
model_entry["param_to_vectors"] = pt
model_entry["param"] = None
def _get_param(pf=pf, model_entry=model_entry):
m = model_entry.model
P = pf(m.meanparam, m.covparam)
model_entry["param"] = P
return P
def _apply_param(P, pt=pt, model_entry=model_entry):
m = model_entry.model
mean_vec, cov_vec = pt(P)
m.meanparam = mean_vec
m.covparam = cov_vec
model_entry["param"] = P
# expose as methods on the per-output record
model_entry["get_param"] = _get_param
model_entry["apply_param"] = _apply_param
def __getitem__(self, index):
"""
Allows accessing the individual models and their attributes using the index.
Parameters
----------
index : int
The index of the model to access.
Returns
-------
dict
The dictionary containing the model and its associated attributes.
"""
return self.models[index]
def __repr__(self):
return self.__str__()
def __str__(self):
"""Human-readable description of all sub-models."""
s = [f"Model Name: {self.name}, Output Dimension: {self.output_dim}"]
for i, model in enumerate(self.models):
# mean_descr = self.mean_functions_info[i]["description"] #FIXME
mean_type = model["model"].meantype
mean_params = model["model"].meanparam
cov_fn = model["model"].covariance
covariance = getattr(cov_fn, "__name__", type(cov_fn).__name__)
cov_params = model["model"].covparam
ig = model["parameters_initial_guess_procedure"]
sc = model["selection_criterion"]
ig_name = getattr(ig, "__name__", type(ig).__name__) if ig else "None"
sc_name = getattr(sc, "__name__", type(sc).__name__) if sc else "None"
s.append(
"\n".join(
[
f"\nGaussian process {i}:",
f" Output Name: {model['output_name']}",
# f" Mean: {mean_descr}",
f" Mean Type: {mean_type}",
f" Mean Parameters: {mean_params}",
f" Covariance: {covariance}",
f" Covariance Parameters: {cov_params}",
f" Initial Guess Procedure: {ig_name}",
f" Selection Criterion: {sc_name}",
]
)
)
return "\n".join(s)
# --------------------------------------------------------------------------
# Mean / covariance configuration (builders included)
# --------------------------------------------------------------------------
def make_mean_functions(self, mean_specification):
"""Make mean functions.
This function sets mean functions based on the parameters
provided in mean_specification. Each entry in mean_specification should
specify a mean function and its associated parameters.
Parameters
----------
mean_specification : list of dict or dict
Parameters for defining mean functions. Can be a single
dictionary applied to all outputs or a list of
dictionaries, one for each output. Dictionary may include:
- 'function': A callable mean function to be used
- 'param_length': The length of the mean function's
parameter vector, required if 'function' is given and is
a callable.
Returns
-------
tuple of list, list
A tuple containing a list of mean function callables and a
list of their descriptions.
Raises
------
ValueError
If mean_specification are not correctly specified or do not match
output_dim.
"""
if isinstance(mean_specification, dict):
mean_specification = [mean_specification] * self.output_dim
elif (
not isinstance(mean_specification, list)
or len(mean_specification) != self.output_dim
):
raise ValueError(
"mean_specification must be a dict or a list of dicts of length output_dim"
)
mean_functions = []
mean_functions_info = []
for i, param in enumerate(mean_specification):
if "function" in param and callable(param["function"]):
mean_function = param["function"]
if "param_length" not in param:
raise ValueError(
"'param_length' is required for callable mean functions"
)
param_length = param["param_length"]
else:
mean_function, param_length = self.build_mean_function(i, param)
mean_functions.append(mean_function)
desc = getattr(mean_function, "__name__", type(mean_function).__name__)
mean_functions_info.append(
{"description": desc, "param_length": param_length}
)
return mean_functions, mean_functions_info
def make_covariance_functions(self, covariance_specification):
"""
Make one covariance function per output.
This method sets covariance functions based on the parameters
provided in `covariance_specification`. Each entry should specify a
covariance function and its associated parameters.
Parameters
----------
covariance_specification : list of dict or dict
Parameters for defining covariance functions. Can be a
single dictionary applied to all outputs or a list of
dictionaries, one for each output. Each dictionary may
have a 'function' key providing a callable covariance
function.
Returns
-------
list
A list of covariance function callables.
Raises
------
ValueError
If `covariance_specification` is not correctly specified or does not
match output_dim.
"""
if isinstance(covariance_specification, dict):
covariance_specification = [covariance_specification] * self.output_dim
elif (
not isinstance(covariance_specification, list)
or len(covariance_specification) != self.output_dim
):
raise ValueError(
"covariance_specification must be a dict or a list of dicts of length output_dim"
)
covariance_functions = []
for i, param in enumerate(covariance_specification):
if "function" in param and callable(param["function"]):
covariance_function = param["function"]
else:
covariance_function = self.build_covariance(i, param)
covariance_functions.append(covariance_function)
return covariance_functions
def build_mean_function(self, output_idx: int, param: dict):
"""Build and return `(mean_callable, n_mean_params)` for the given output.
Override in subclasses.
Parameters
----------
output_idx : int
The index of the output for which the covariance function
is being created.
param : dict
Additional parameters for the mean function
Returns
-------
(callable, int)
The corresponding mean function and the number of parameters.
"""
raise NotImplementedError
def build_covariance(self, output_idx: int, param: dict):
"""
Build and return a covariance callable for the given output.
Override in subclasses.
Parameters
----------
output_idx : int
The index of the output for which the covariance function
is being created.
param : dict
Additional parameters for the covariance function
Returns
-------
callable
A covariance function.
"""
raise NotImplementedError
# --------------------------------------------------------------------------
# Param / Selection-criterion plumbing (builders + wrappers)
# --------------------------------------------------------------------------
def make_param_object_procedures(
self, param_procedures=None, auxiliary_params=None
):
"""
Return per-output Param adapter procedures.
This procedure normalizes user input (None, a single (pf, pt) pair, or a
per-output list of pairs) into a validated list of (pf, pt) pairs, so the
rest of the code can index procedures[i] unconditionally.
If param_procedures is None:
build via build_param_procedures(i, **bp) per output.
If a single pair (pf, pt) is passed:
broadcast to all outputs.
If a list of pairs is passed:
length must equal output_dim.
pf, pt meanings
---------------
pf ("param_from_vectors"):
Callable that packs model parameter vectors into a Param object:
pf(meanparam, covparam) -> Param
pt ("param_to_vectors"):
Callable that unpacks a Param object into model parameter vectors:
pt(Param) -> (meanparam, covparam)
"""
# broadcast build_params
if not isinstance(auxiliary_params, list):
auxiliary_params = [auxiliary_params] * self.output_dim
if len(auxiliary_params) != self.output_dim:
raise ValueError("Length of auxiliary_params must match output_dim")
# Case 1: build
if param_procedures is None:
return [
self.build_param_procedures(i, **(bp or {}))
for i, bp in enumerate(auxiliary_params)
]
# Case 2: single pair - broadcast
if (
isinstance(param_procedures, tuple)
and len(param_procedures) == 2
and callable(param_procedures[0])
and callable(param_procedures[1])
):
return [param_procedures] * self.output_dim
# Case 3: list of pairs -> validate
if isinstance(param_procedures, list):
if len(param_procedures) != self.output_dim:
raise ValueError("param_procedures must have length output_dim")
for j, pair in enumerate(param_procedures):
if (
not isinstance(pair, tuple)
or len(pair) != 2
or not callable(pair[0])
or not callable(pair[1])
):
raise TypeError(
f"param_procedures[{j}] must be a (callable, callable) tuple"
)
return param_procedures
raise TypeError(
"param_procedures must be None, a (callable, callable) pair, or a list of such pairs"
)
def make_parameters_initial_guess_procedures(
self, initial_guess_procedures=None, auxiliary_params=None
):
"""
Return initial-guess procedures (one per output).
This procedure normalizes user input (None, a single callable, or a
per-output list) into a validated list of per-output procedures, so the
rest of the code can index procedures[i] unconditionally.
initial_guess_procedures may be:
- None: build per output via build_parameters_initial_guess_procedure(...)
- callable: broadcast to all outputs
- list[callable]: one per output (length must be output_dim)
auxiliary_params may be:
- None: each builder gets {}
- dict: broadcast to all outputs
- list[dict|None]: one per output (length must be output_dim)
"""
# Normalize auxiliary_params -> list of dicts (or {})
if auxiliary_params is None:
build_params = [{} for _ in range(self.output_dim)]
elif isinstance(auxiliary_params, dict):
build_params = [auxiliary_params] * self.output_dim
elif isinstance(auxiliary_params, list):
if len(auxiliary_params) != self.output_dim:
raise ValueError("Length of auxiliary_params must match output_dim")
build_params = [(p or {}) for p in auxiliary_params]
else:
raise TypeError("auxiliary_params must be None, a dict, or a list of dicts")
# Normalize initial_guess_procedures -> list of callables
if initial_guess_procedures is None:
procedures = [
self.build_parameters_initial_guess_procedure(i, **build_params[i])
for i in range(self.output_dim)
]
elif callable(initial_guess_procedures):
procedures = [initial_guess_procedures] * self.output_dim
elif isinstance(initial_guess_procedures, list):
if len(initial_guess_procedures) != self.output_dim:
raise ValueError("initial_guess_procedures must have length output_dim")
if not all(callable(p) for p in initial_guess_procedures):
raise TypeError("initial_guess_procedures must contain only callables")
procedures = initial_guess_procedures
else:
raise TypeError(
"initial_guess_procedures must be None, a callable, or a list of callables"
)
return procedures
def make_selection_criteria(
self, selection_criteria=None, auxiliary_params=None, xi=None, zi=None
):
"""
Return selection criteria (one per output).
This procedure normalizes user input (None, a single callable, or a
per-output list) into a validated list of per-output criteria, so the rest
of the code can index criteria[i] unconditionally.
Parameters
----------
selection_criteria : callable | list[callable] | None
Criterion procedure(s). If None, criteria are built via
build_selection_criterion(...). If a callable is provided, it is
broadcast to all outputs. If a list is provided, its length must match
output_dim.
auxiliary_params : dict | list[dict] | None
Per-output parameters passed to build_selection_criterion when
selection_criteria is None. If a dict is provided, it is broadcast to
all outputs. If a list is provided, its length must match output_dim.
xi : array_like, optional
Observation points, shape (n, d). Used only when
``self.selection_criterion_build_policy == "needs_observations"``.
If an explicit ``selection_criteria`` callable/list is provided,
``xi`` is ignored.
zi : array_like, optional
Observed values, shape (n,) or (n, output_dim). Used only when
``self.selection_criterion_build_policy == "needs_observations"``.
If an explicit ``selection_criteria`` callable/list is provided,
``zi`` is ignored.
Returns
-------
list[callable]
A list of selection criterion callables, one per output.
Notes
-----
Whether observations are required at criterion-build time is controlled by
``selection_criterion_build_policy == "needs_observations"``.
"""
needs_observations = self.selection_criterion_build_policy == "needs_observations"
if selection_criteria is None and needs_observations:
if xi is None or zi is None:
raise ValueError(
"xi and zi must be provided when "
"selection criteria require observations."
)
xi_ = gnp.asarray(xi)
zi_ = gnp.asarray(zi)
if zi_.ndim == 1:
zi_ = zi_.reshape(-1, 1)
if int(zi_.shape[1]) != self.output_dim:
raise ValueError("zi second dimension must match output_dim.")
else:
xi_ = None
zi_ = None
# Normalize auxiliary_params -> list of dicts
if auxiliary_params is None:
build_params = [{} for _ in range(self.output_dim)]
elif isinstance(auxiliary_params, dict):
build_params = [auxiliary_params] * self.output_dim
elif isinstance(auxiliary_params, list):
if len(auxiliary_params) != self.output_dim:
raise ValueError("Length of auxiliary_params must match output_dim")
build_params = [(p or {}) for p in auxiliary_params]
else:
raise TypeError("auxiliary_params must be None, a dict, or a list of dicts")
# Normalize selection_criteria -> list of callables
if selection_criteria is None:
contexts = [
SelectionCriterionBuildContext(
model=self.models[i]["model"],
xi=xi_ if needs_observations else None,
zi=zi_[:, i] if needs_observations else None,
)
for i in range(self.output_dim)
]
criteria = [
self.build_selection_criterion(
i, context=contexts[i], **build_params[i]
)
for i in range(self.output_dim)
]
elif callable(selection_criteria):
criteria = [selection_criteria] * self.output_dim
elif isinstance(selection_criteria, list):
if len(selection_criteria) != self.output_dim:
raise ValueError("selection_criteria must have length output_dim")
if not all(callable(c) for c in selection_criteria):
raise TypeError("selection_criteria must contain only callables")
criteria = selection_criteria
else:
raise TypeError(
"selection_criteria must be None, a callable, or a list of callables"
)
return criteria
@staticmethod
def _is_unbuilt_selection_criterion(criterion):
return bool(getattr(criterion, "_gpmp_unbuilt", False))
def _ensure_selection_criteria(
self, *, xi=None, zi=None, rebuild=False, selection_criteria=None
):
"""Ensure per-output selection criteria are available.
Parameters
----------
xi : array_like, optional
Observation points used when criteria must be built from
``build_selection_criterion`` and the build policy requires data.
zi : array_like, optional
Observation values used when criteria must be built from
``build_selection_criterion`` and the build policy requires data.
rebuild : bool, default False
If True, rebuild criteria (even if already built).
selection_criteria : callable | list[callable] | None, optional
Optional override for the criterion spec. If None, uses the
constructor-provided ``self._selection_criteria_spec``.
"""
if selection_criteria is None:
selection_criteria = self._selection_criteria_spec
criteria_missing = any(
ModelContainer._is_unbuilt_selection_criterion(
model_entry.get("selection_criterion", None)
)
for model_entry in self.models
)
if not (rebuild or criteria_missing):
return
built = self.make_selection_criteria(
selection_criteria=selection_criteria,
xi=xi,
zi=zi,
)
for i in range(self.output_dim):
self.models[i]["selection_criterion"] = built[i]
def build_param_procedures(self, output_idx: int, **kwargs):
"""Return Param procedures"""
raise NotImplementedError
def build_parameters_initial_guess_procedure(self, output_idx: int, **build_param):
"""Build an initial guess procedure for anisotropic parameters.
Override in subclass.
Returns
-------
function
A function to compute initial guesses for anisotropic parameters.
"""
raise NotImplementedError
def build_selection_criterion(self, output_idx: int, context=None, **build_params):
"""Override in subclass to construct a selection criterion."""
raise NotImplementedError
# --------------------------------------------------------------------------
# Parameter selection
# --------------------------------------------------------------------------
def select_params(
self,
xi,
zi,
*,
force_param_initial_guess=True,
param0=None,
use_bounds_from_param_obj=True,
bounds=None,
bounds_factory=None,
bounds_delta=None,
method="SLSQP",
method_options=None,
rebuild_selection_criterion=None,
):
"""
Parameter selection (per output).
Parameters
----------
xi : array_like
Observation points, shape (n, d).
zi : array_like
Observed values. Shape (n,) for single-output, or (n, output_dim).
force_param_initial_guess : bool, default True
Controls initializer selection when ``param0`` is not provided.
If True, compute an initial guess even when the model already has
parameters. If False and the model already has parameters, those
are reused as the starting point.
param0 : None | array_like | Param | list[ array_like | Param ]
Optional initializer(s) for optimization. For multi-output:
- provide a list where param0[i] is either a vector in the
model's normalized coordinates or a Param for output i.
For single-output, you may pass a single vector or a Param.
If a vector is passed, it must be the concatenation of mean
parameters (length = mean_paramlength) followed by covariance
parameters. If a Param is passed, the method uses the per-output
adapter param_to_vectors to extract (mean, cov).
Precedence rule: when ``param0`` is provided, it is always used
as initializer and ``force_param_initial_guess`` is ignored.
use_bounds_from_param_obj : bool, default True
If True and a per-output Param object is available, bounds are
taken from Param.bounds (normalized coordinates) unless overridden
by manual bounds or a bounds_factory.
bounds : None | array_like(n_params, 2) | list[array_like(n_params, 2)]
Manual bounds in normalized space. If provided for multi-output,
either supply a single (n_params, 2) array to broadcast to all
outputs, or a list with one (n_params, 2) array per output.
These take precedence over bounds from Param or bounds_factory.
bounds_factory : None | callable
Optional callback to compute bounds from data and the initializer:
bounds_factory(model_dict, xi_array, zi_vector, param0_init_vector)
It must return an array of shape (n_params, 2) or None.
Used only if bounds is None; takes precedence over Param bounds.
bounds_delta : None | float | array_like
Local interval around the initializer param0_init in normalized space.
If provided, for each parameter theta0 we build [theta0 - delta, theta0 + delta].
If scalar, the same delta is used for all parameters; if an array, the
length must match the parameter dimension. This interval is then
intersected with whichever base bounds were chosen by precedence:
manual bounds > bounds_factory(...) > Param bounds > None
method : {"SLSQP", "L-BFGS-B"}, default "SLSQP"
Optimization method passed to ``gp.kernel.autoselect_parameters``.
method_options : dict or None, default None
Additional method-specific SciPy options. Keys are forwarded to
``scipy.optimize.minimize(..., options=...)`` through
``gp.kernel.autoselect_parameters``.
rebuild_selection_criterion : bool | None, default None
Whether to rebuild per-output selection criteria at call time.
If None, uses ``self.rebuild_selection_criterion_on_select_params``.
If True, criteria are rebuilt via ``build_selection_criterion`` before
optimization. If ``self.selection_criterion_build_policy ==
"needs_observations"``, ``xi`` and ``zi`` are forwarded to
``make_selection_criteria``.
Criteria are also built automatically on first call when missing.
Notes
-----
Initializer precedence per output:
1. ``param0[i]`` when ``param0`` is provided.
2. Otherwise, if ``force_param_initial_guess=True`` or model covparam is
missing: ``parameters_initial_guess_procedure``.
3. Otherwise: current model parameters.
"""
xi_ = gnp.asarray(xi)
zi_ = gnp.asarray(zi)
if zi_.ndim == 1:
zi_ = zi_.reshape(-1, 1)
if rebuild_selection_criterion is None:
rebuild_selection_criterion = (
self.rebuild_selection_criterion_on_select_params
)
self._ensure_selection_criteria(
xi=xi_,
zi=zi_,
rebuild=bool(rebuild_selection_criterion),
)
# Normalize param0 container for multi-output
if param0 is not None:
if self.output_dim == 1:
if isinstance(param0, (list, tuple)):
# Single-output: treat [scalar, ...] / (scalar, ...) as a parameter vector,
# and [entry] as an explicit one-item container.
if len(param0) == 1:
try:
is_scalar0 = gnp.asarray(param0[0]).ndim == 0
except Exception:
is_scalar0 = False
param0 = [param0] if is_scalar0 else list(param0)
else:
param0 = [param0]
else:
param0 = [param0]
elif not isinstance(param0, (list, tuple)):
raise TypeError(
"For multi-output models, param0 must be a list/tuple with one entry per output."
)
if len(param0) != self.output_dim:
raise ValueError(
f"param0 must have length {self.output_dim}, got {len(param0)}."
)
for i in range(self.output_dim):
tic = time.time()
model = self.models[i]
mpl = int(model["mean_paramlength"])
# 1) Initializer (meanparam0, covparam0, param0_init)
p0_i = None if param0 is None else param0[i]
meanparam0, covparam0, param0_init = (
ModelContainer._resolve_param0_init_for_output(
model,
xi_,
zi_[:, i],
force_param_initial_guess=force_param_initial_guess,
param0_entry=p0_i,
)
)
n_params = int(param0_init.shape[0])
# 2) Base bounds by precedence: manual > factory > Param > None
base_bounds = ModelContainer._extract_bounds_for_output(
bounds, i, n_params, output_dim=self.output_dim
)
if base_bounds is None and callable(bounds_factory):
base_bounds = bounds_factory(model, xi_, zi_[:, i], param0_init)
if base_bounds is not None:
base_bounds = ModelContainer._as_bounds_array(base_bounds, n_params)
if base_bounds is None and use_bounds_from_param_obj:
# Use existing Param if present; otherwise build a temporary one from the initializer.
P = model.get("param", None)
if P is None and callable(model.get("param_from_vectors", None)):
P = model["param_from_vectors"](meanparam0, covparam0)
if P is not None:
base_bounds = ModelContainer._bounds_from_param_obj(P)
# 3) Local +-delta interval around param0_init (intersect)
if bounds_delta is not None:
local_interval = ModelContainer._interval_around(
param0_init, bounds_delta
)
base_bounds = ModelContainer._intersect_bounds(
base_bounds, local_interval
)
# 4) Optimize
selection_criterion = model["selection_criterion"]
meanparam_len = model["mean_paramlength"]
crit, crit_pre_grad, crit_no_grad, crit_grad = (
gp.kernel.make_selection_criterion_with_gradient(
model["model"],
selection_criterion,
xi=xi_,
zi=zi_[:, i],
dataloader=None,
parameterized_mean=True if meanparam_len > 0 else False,
meanparam_len=meanparam_len,
)
)
# crit, dcrit, crit_nograd = self.make_selection_criterion_with_gradient(
# model, xi_, zi_[:, i]
# )
param, info = gp.kernel.autoselect_parameters(
param0_init,
crit_pre_grad,
crit_grad,
bounds=base_bounds,
silent=True,
info=True,
method=method,
method_options=method_options,
)
# 5) Write back params
model["model"].meanparam = gnp.asarray(param[:mpl])
model["model"].covparam = gnp.asarray(param[mpl:])
# 6) Refresh Param object (pf must be pf(meanparam, covparam))
if callable(model.get("get_param", None)):
model["param"] = model["get_param"]()
elif callable(model.get("param_from_vectors", None)):
model["param"] = model["param_from_vectors"](
model["model"].meanparam, model["model"].covparam
)
# Mirror final bounds into Param.bounds (normalized space)
ModelContainer._apply_bounds_to_param(
model.get("param", None), base_bounds, mpl
)
# 7) Record info
model["info"] = info
model["info"]["meanparam0"] = meanparam0
model["info"]["covparam0"] = covparam0
model["info"]["param0"] = param0_init
model["info"]["meanparam"] = model["model"].meanparam
model["info"]["covparam"] = model["model"].covparam
model["info"]["param"] = param
model["info"]["selection_criterion"] = crit
model["info"]["selection_criterion_nograd"] = crit_no_grad
model["info"]["time"] = time.time() - tic
# --------------------------------------------------------------------------
# Inference / prediction APIs
# --------------------------------------------------------------------------
def predict(self, xi, zi, xt, convert_in=True, convert_out=True):
"""
Predictions over all outputs.
Returns
-------
(zpm, zpv) :
Mean and variance arrays of shape (m, output_dim).
"""
xi, zi, xt = self._ensure_shapes_and_type(
xi=xi, zi=zi, xt=xt, convert=convert_in
)
zpm_ = gnp.empty((xt.shape[0], self.output_dim))
zpv_ = gnp.empty((xt.shape[0], self.output_dim))
for i in range(self.output_dim):
zpm_i, zpv_i = self.models[i]["model"].predict(
xi, zi[:, i], xt, convert_in=convert_in, convert_out=False
)
zpm_[:, i] = zpm_i
zpv_[:, i] = zpv_i
if convert_out:
return gnp.to_np(zpm_), gnp.to_np(zpv_)
return zpm_, zpv_
def loo(self, xi, zi, convert_in=True, convert_out=True):
"""
Perform leave-one-out (LOO) cross-validation for each model within the container.
Parameters
----------
xi : ndarray
Input data points, where each row represents a data point.
zi : ndarray
Output values corresponding to xi. Should have a shape of (n_samples, output_dim) where
n_samples is the number of data points and output_dim is the number of outputs.
convert_in : bool, optional
If True, converts input data to the appropriate type for processing (default is True).
convert_out : bool, optional
If True, converts output data to numpy array type before returning (default is True).
Returns
-------
(zloo, sigma2loo, eloo) :
Arrays of shape (n, output_dim).
"""
xi, zi, _ = self._ensure_shapes_and_type(xi=xi, zi=zi, convert=convert_in)
zloo_ = gnp.empty((xi.shape[0], self.output_dim))
sigma2loo_ = gnp.empty((xi.shape[0], self.output_dim))
eloo_ = gnp.empty((xi.shape[0], self.output_dim))
for i in range(self.output_dim):
zloo_i, sigma2loo_i, eloo_i = self.models[i]["model"].loo(
xi, zi[:, i], convert_in=convert_in, convert_out=False
)
zloo_[:, i] = zloo_i
sigma2loo_[:, i] = sigma2loo_i
eloo_[:, i] = eloo_i
if convert_out:
return gnp.to_np(zloo_), gnp.to_np(sigma2loo_), gnp.to_np(eloo_)
return zloo_, sigma2loo_, eloo_
def compute_conditional_simulations(
self,
xi,
zi,
xt,
n_samplepaths=1,
type="intersection",
method="svd",
convert_in=True,
convert_out=True,
):
"""
Generate conditional sample paths based on input data and simulation points.
Parameters
----------
xi : ndarray(ni, d)
Input data points used in the GP model.
zi : ndarray(ni, output_dim)
Observations at the input data points xi.
xt : ndarray(nt, d)
Points at which to simulate.
n_samplepaths : int, optional
Number of sample paths to generate. Default is 1.
type : str, optional
Specifies the relationship between xi and xt. Can be 'intersection'
(xi and xt may have a non-empty intersection) or 'disjoint'
(xi and xt must be disjoint). Default is 'intersection'.
method : str, optional
Method to draw unconditional sample paths. Can be 'svd' or 'chol'. Default is 'svd'.
Returns
-------
ndarray
An array of conditional sample paths at simulation points xt.
The shape of the array is (nt, n_samplepaths) for a single output model,
and (nt, n_samplepaths, output_dim) for multi-output models.
FIXME
-----
Allow an optional zsim argument.
"""
xi_, zi_, xt_ = self._ensure_shapes_and_type(
xi=xi, zi=zi, xt=xt, convert=convert_in
)
ni, nt = xi_.shape[0], xt_.shape[0]
xtsim = gnp.vstack((xi_, xt_))
if type == "intersection":
xtsim, indices = gnp.unique(xtsim, return_inverse=True, axis=0)
xtsim_xi_ind = indices[0:ni]
xtsim_xt_ind = indices[ni : (ni + nt)]
elif type == "disjoint":
xtsim_xi_ind = gnp.arange(ni)
xtsim_xt_ind = gnp.arange(nt) + ni
else:
raise ValueError("type must be 'intersection' or 'disjoint'")
# unconditional sample paths
n = xtsim.shape[0]
zsim = gnp.empty((n, n_samplepaths, self.output_dim))
for i in range(self.output_dim):
zsim_i = self.models[i]["model"].sample_paths(
xtsim, n_samplepaths, method=method
)
zsim[:, :, i] = zsim_i
# conditional
zpsim = gnp.empty((nt, n_samplepaths, self.output_dim))
for i in range(self.output_dim):
zpm, zpv, lambda_t = self.models[i]["model"].predict(
xi_,
zi_[:, i],
xtsim[xtsim_xt_ind],
return_lambdas=True,
convert_in=False,
convert_out=False,
)
if self.models[i]["model"].meantype == "linear_predictor":
zpsim_i = self.models[i]["model"].conditional_sample_paths(
zsim[:, :, i],
xtsim_xi_ind,
zi_[:, i],
xtsim_xt_ind,
lambda_t,
convert_out=False,
)
elif self.models[i]["model"].meantype == "parameterized":
zpsim_i = self.models[i][
"model"
].conditional_sample_paths_parameterized_mean(
zsim[:, :, i],
xi_,
xtsim_xi_ind,
zi_[:, i],
xt_,
xtsim_xt_ind,
lambda_t,
convert_out=False,
)
else:
raise ValueError(
f"Unsupported meantype {self.models[i]['model'].meantype}"
)
zpsim[:, :, i] = zpsim_i
if self.output_dim == 1:
zpsim = zpsim.reshape((zpsim.shape[0], zpsim.shape[1]))
if convert_out:
zpsim = gnp.to_np(zpsim)
return zpsim
# --------------------------------------------------------------------------
# Diagnostics
# --------------------------------------------------------------------------
def _indices_or_single(self, output_ind):
"""Yield all output indices, or a single index if provided."""
return range(self.output_dim) if output_ind is None else [output_ind]
def _validate_and_slice_targets(self, z, output_ind):
"""
Ensure targets have the right shape and return a column-picker.
Returns
-------
(z_, take_col_fn)
`z_` is the validated array. `take_col_fn(arr, k)` returns the
1D target for output k (no-op if `z_` is already 1D).
"""
z_ = gnp.asarray(z)
if z_.ndim == 1:
# ok for single-output, or for multi-output when a specific output_ind is given
if self.output_dim != 1 and output_ind is None:
raise ValueError(
"zi is 1D but model is multi-output; provide output_ind or a 2D zi."
)
take_col_fn = lambda arr, k: arr # no-op
else:
if z_.shape[1] != self.output_dim:
raise ValueError(
f"zi must have {self.output_dim} columns; got {z_.shape[1]}."
)
take_col_fn = lambda arr, k: arr[:, k]
return z_, take_col_fn
def run_diagnosis(
self, xi, zi, *, output_ind: int | None = None, convert_in: bool = True
):
"""
Wrapper around `gp.modeldiagnosis.diag`.
If `output_ind` is None, runs for all outputs; otherwise only for the
specified output.
"""
xi_ = gnp.asarray(xi) if convert_in else xi
zi_, take_col_fn = self._validate_and_slice_targets(zi, output_ind)
for k in self._indices_or_single(output_ind):
print(f"~ Model [{k}]")
info = self.models[k]["info"]
if info is None:
raise ValueError(
f"Output {k}: no selection info. Run select_params() first."
)
zi_k = zi_ if zi_.ndim == 1 else take_col_fn(zi_, k)
gp.modeldiagnosis.diag(
self.models[k]["model"],
info,
xi_,
zi_k,
param_obj=self.models[k].get("param", None),
)
def run_perf(
self,
xi,
zi,
*,
output_ind: int | None = None,
loo: bool = True,
loo_res=None, # (zloom, zloov, eloo) — 1D or 2D; slice per k if 2D
xtzt=None, # (xt, zt) — slice zt per k if 2D
zpmzpv=None, # (zpm, zpv) — slice BOTH per k if 2D
convert_in: bool = True,
):
"""
Wrapper around `gp.modeldiagnosis.perf`.
If `output_ind` is None, runs for all outputs; otherwise only for the
specified output. Supports providing precomputed tuples (`loo_res`,
`xtzt`, `zpmzpv`) either as 1D or 2D arrays.
"""
xi_ = gnp.asarray(xi) if convert_in else xi
zi_, take_col_fn = self._validate_and_slice_targets(zi, output_ind)
# Optional test tuples (convert xt/zt if provided)
if xtzt is not None:
xt, zt = xtzt
xt_ = gnp.asarray(xt) if convert_in else xt
zt_ = gnp.asarray(zt) if convert_in else zt
else:
xt_ = zt_ = None
# Slicers ----------------------------------------------------
def _slice_test(tup, k):
"""For (xt, zt): xt shared, zt sliced if 2D."""
if tup is None:
return None
a, b = tup
b_k = b if getattr(b, "ndim", 1) == 1 else b[:, k]
return (a, b_k)
def _slice_pred(tup, k):
"""For (zpm, zpv): slice BOTH if 2D."""
if tup is None:
return None
a, b = tup
s = lambda arr: arr if getattr(arr, "ndim", 1) == 1 else arr[:, k]
return (s(a), s(b))
def _slice_loo(tup, k):
if tup is None:
return None
zloom, zloov, eloo = tup
s = lambda arr: arr if getattr(arr, "ndim", 1) == 1 else arr[:, k]
return (s(zloom), s(zloov), s(eloo))
# ------------------------------------------------------------
for k in self._indices_or_single(output_ind):
print(f"~ Model [{k}]")
zi_k = zi_ if zi_.ndim == 1 else take_col_fn(zi_, k)
xtzt_k = _slice_test((xt_, zt_), k) if xt_ is not None else None
zpmzpv_k = _slice_pred(zpmzpv, k) if zpmzpv is not None else None
loo_k = _slice_loo(loo_res, k) if loo_res is not None else None
gp.modeldiagnosis.perf(
self.models[k]["model"],
xi_,
zi_k,
loo=loo,
loo_res=loo_k,
xtzt=xtzt_k,
zpmzpv=zpmzpv_k,
)
# --------------------------------------------------------------------------
# Selection-criterion visualization & stats (per-output wrappers)
# --------------------------------------------------------------------------
def plot_selection_criterion_crosssections(
self,
*,
model_index: int = 0,
param_box=None,
param_box_pooled=None,
n_points: int = 100,
param_names=None,
delta: float = 5.0,
pooled: bool = False,
ind=None,
ind_pooled=None,
):
"""Wrapper around `gp.modeldiagnosis.plot_selection_criterion_crosssections`."""
info = self.models[model_index]["info"]
if info is None:
raise ValueError("Run select_params() first to populate info.")
gp.modeldiagnosis.plot_selection_criterion_crosssections(
info=info,
selection_criterion=None,
selection_criteria=None,
covparam=None,
n_points=n_points,
param_names=param_names,
criterion_name="selection criterion",
criterion_names=None,
criterion_name_full="Cross sections for negative log restricted likelihood",
ind=ind,
ind_pooled=ind_pooled if pooled else None,
param_box=param_box if not pooled else None,
param_box_pooled=param_box_pooled if pooled else None,
delta=delta,
)
def plot_selection_criterion_profile_2d(
self,
model_index: int = 0,
param_indices=(0, 1),
param_names=None,
criterion_name="selection criterion",
):
"""Wrapper around `gp.modeldiagnosis.plot_selection_criterion_2d`."""
info = self.models[model_index]["info"]
if info is None:
raise ValueError("Run select_params() first to populate info.")
gp.modeldiagnosis.plot_selection_criterion_2d(
self.models[model_index]["model"],
info,
param_indices=param_indices,
param_names=param_names,
criterion_name=criterion_name,
)
def selection_criterion_stats_fast(
self,
xi,
*,
model_index: int = 0,
ind=None,
param_box=None,
delta=5.0,
n_points=250,
verbose=False,
):
"""Wrapper around `gp.modeldiagnosis.selection_criterion_statistics_fast`."""
info = self.models[model_index]["info"]
if info is None:
raise ValueError("Run select_params() first to populate info.")
return gp.modeldiagnosis.selection_criterion_statistics_fast(
info=info,
model=self.models[model_index]["model"],
xi=gnp.asarray(xi),
ind=ind,
param_box=param_box,
delta=delta,
n_points=n_points,
verbose=verbose,
)
def selection_criterion_stats(
self,
xi,
*,
model_index: int = 0,
ind=None,
param_box=None,
delta=5.0,
verbose=False,
):
"""Wrapper around `gp.modeldiagnosis.selection_criterion_statistics`."""
info = self.models[model_index]["info"]
if info is None:
raise ValueError("Run select_params() first to populate info.")
return gp.modeldiagnosis.selection_criterion_statistics(
info=info,
model=self.models[model_index]["model"],
xi=gnp.asarray(xi),
ind=ind,
param_box=param_box,
delta=delta,
verbose=verbose,
)
# --------------------------------------------------------------------------
# Parameter posterior sampling
# --------------------------------------------------------------------------
def sample_parameters(
self,
method="nuts",
model_indices=None,
init_box=None,
sampling_box=None,
**kwargs,
):
"""Sample GP parameters from the stored selection criterion.
Parameters
----------
method : {"mh", "hmc", "nuts", "smc"}, default "nuts"
Sampling method.
- "mh": Metropolis-Hastings sampler
- "hmc"/"nuts": NUTS-based Hamiltonian sampling
- "smc": Sequential Monte Carlo sampler
model_indices : list of int, optional
Indices of models to sample. Defaults to all models.
init_box : list, optional
Initialization box.
- Required for method="smc" (particle initialization domain).
- For "mh"/"hmc"/"nuts", used only when random_init=True.
sampling_box : list, optional
Domain box used to truncate the target criterion during sampling
(outside points get log-prob = -inf). If None, sampling is unbounded.
**kwargs
Extra arguments passed to the selected gpmp sampler:
- gpmp.mcmc.param_posterior.sample_from_selection_criterion_mh
- gpmp.mcmc.param_posterior.sample_from_selection_criterion_nuts
- gpmp.mcmc.param_posterior.sample_from_selection_criterion_smc
Returns
-------
dict
Dictionary mapping model index to:
- method="mh": {"samples": ..., "mh": ..., "criterion_values": ...}
- method="hmc"/"nuts": {"samples": ..., "nuts": ...}
- method="smc": {"particles": ..., "smc": ...}
"""
method = str(method).lower()
if method not in {"mh", "hmc", "nuts", "smc", "svgd"}:
raise ValueError(
"method must be one of: 'mh', 'hmc', 'nuts', 'smc', 'svgd'."
)
if model_indices is None:
model_indices = list(range(self.output_dim))
results = {}
for idx in model_indices:
model_info = self.models[idx].get("info")
if model_info is None:
raise ValueError(
f"Model {idx} missing 'info'. Run select_params() first."
)
if method == "mh":
from gpmp.mcmc.param_posterior import sample_from_selection_criterion_mh
samples, mh = sample_from_selection_criterion_mh(
info=model_info,
init_box=init_box,
sampling_box=sampling_box,
**kwargs,
)
log_target_values = gp.mcmc.get_log_target_values(
mh, discard_burnin=False
)
criterion_values = -log_target_values[:, mh.burnin_period :]
results[idx] = {
"samples": samples,
"mh": mh,
"criterion_values": criterion_values,
}
elif method in {"hmc", "nuts"}:
from gpmp.mcmc.param_posterior import (
sample_from_selection_criterion_nuts,
)
# Backward-compatible aliases for users migrating from MH-style kwargs.
nuts_kwargs = dict(kwargs)
if "n_steps_total" in nuts_kwargs and "num_samples" not in nuts_kwargs:
nuts_kwargs["num_samples"] = nuts_kwargs.pop("n_steps_total")
if "burnin_period" in nuts_kwargs and "num_warmup" not in nuts_kwargs:
nuts_kwargs["num_warmup"] = nuts_kwargs.pop("burnin_period")
if "show_progress" in nuts_kwargs and "progress" not in nuts_kwargs:
nuts_kwargs["progress"] = nuts_kwargs.pop("show_progress")
if "silent" in nuts_kwargs:
silent = bool(nuts_kwargs.pop("silent"))
nuts_kwargs.setdefault("verbose", 0 if silent else 1)
nuts_kwargs.setdefault("progress", not silent)
nuts_kwargs.pop("n_pool", None)
samples, nuts_info = sample_from_selection_criterion_nuts(
info=model_info,
init_box=init_box,
sampling_box=sampling_box,
**nuts_kwargs,
)
key = "hmc" if method == "hmc" else "nuts"
results[idx] = {"samples": samples, key: nuts_info}
elif method == "smc":
from gpmp.mcmc.param_posterior import (
sample_from_selection_criterion_smc,
)
if init_box is None:
raise ValueError("init_box must be provided when method='smc'.")
particles, smc_instance = sample_from_selection_criterion_smc(
info=model_info,
init_box=init_box,
sampling_box=sampling_box,
**kwargs,
)
results[idx] = {"particles": particles, "smc": smc_instance}
else:
from gpmp.mcmc.param_posterior import (
sample_from_selection_criterion_svgd,
)
particles, svgd_info = sample_from_selection_criterion_svgd(
info=model_info,
init_box=init_box,
sampling_box=sampling_box,
**kwargs,
)
criterion_values = -gnp.asarray(svgd_info["log_prob_final"])
results[idx] = {
"particles": particles,
"svgd": svgd_info,
"criterion_values": criterion_values,
}
return results
# --------------------------------------------------------------------------
# Serialization
# --------------------------------------------------------------------------
def _filter_info(self, info):
"""Drop non-pickleable entries from an `info` dict/object clone."""
if info is None:
return None
info_copy = info.copy()
# Remove keys that are functions or other non-pickleable objects.
for key in list(info_copy.keys()):
if callable(info_copy[key]):
del info_copy[key]
return info_copy
def get_state(self):
"""Return a serializable snapshot of parameters and (filtered) info."""
state = {"models": []}
for m in self.models:
state["models"].append(
{
"meanparam": (
gnp.copy(m["model"].meanparam)
if m["model"].meanparam is not None
else None
),
"covparam": (
gnp.copy(m["model"].covparam)
if m["model"].covparam is not None
else None
),
"info": self._filter_info(m["info"]),
}
)
return state
def set_state(self, state):
"""Restore parameters and info from `get_state()` output."""
for m, m_state in zip(self.models, state.get("models", [])):
if m_state["meanparam"] is not None:
m["model"].meanparam = m_state["meanparam"]
if m_state["covparam"] is not None:
m["model"].covparam = m_state["covparam"]
m["info"] = m_state.get("info", None)
def save_state(self, filename):
"""Serialize model state to disk (pickle)."""
from pickle import dump
state = self.get_state()
with open(filename, "wb") as f:
dump(state, f)
def load_state(self, filename):
"""Load model state from disk (pickle)."""
from pickle import load
with open(filename, "rb") as f:
state = load(f)
self.set_state(state)
# --------------------------------------------------------------------------
# Helpers
# --------------------------------------------------------------------------
# .................................................... parameters and bounds
@staticmethod
def _split_mean_cov_from_vector(p, mpl):
p = gnp.asarray(p).reshape(-1)
return p[:mpl], p[mpl:]
@staticmethod
def _resolve_param0_init_for_output(
model_dict,
xi,
zi_col,
force_param_initial_guess,
param0_entry=None,
):
"""Resolve per-output initializer vectors used by ``select_params``."""
mpl = int(model_dict["mean_paramlength"])
if param0_entry is not None:
meanparam0, covparam0 = ModelContainer._param_to_vector_pair(
param0_entry, model_dict, mpl
)
else:
if model_dict["model"].covparam is None or force_param_initial_guess:
igp = model_dict["parameters_initial_guess_procedure"]
if mpl == 0:
meanparam0 = gnp.array([])
covparam0 = igp(model_dict["model"], xi, zi_col)
else:
meanparam0, covparam0 = igp(model_dict["model"], xi, zi_col)
else:
meanparam0 = model_dict["model"].meanparam
covparam0 = model_dict["model"].covparam
meanparam0 = gnp.asarray(meanparam0).reshape(-1)
covparam0 = gnp.asarray(covparam0).reshape(-1)
param0_init = gnp.concatenate((meanparam0, covparam0))
return meanparam0, covparam0, param0_init
@staticmethod
def _param_to_vector_pair(p, model_dict, mpl):
"""Accept Param or vector; return (mean0, cov0) in normalized space.
Parameters
----------
p : Param | array_like
Either a Param object or a 1D vector containing [mean..., cov...].
model_dict : dict
Per-output model record (AttrDict/dict) containing adapters.
Expected keys:
- "param_to_vectors": callable, mapping Param -> (meanparam, covparam)
mpl : int
Mean-parameter length.
Returns
-------
(mean0, cov0) : (array, array)
1D arrays in normalized coordinates.
"""
# Param object path: rely on the container-installed adapter
if hasattr(p, "get_by_path") and callable(getattr(p, "get_by_path")):
pt = model_dict.get("param_to_vectors", None)
if pt is None or not callable(pt):
raise ValueError(
"param_to_vectors adapter is required to use a Param initializer."
)
mean0, cov0 = pt(p)
mean0 = gnp.asarray(mean0).reshape(-1)
cov0 = gnp.asarray(cov0).reshape(-1)
if mean0.shape[0] != int(mpl):
raise ValueError(
f"Param initializer produced mean length {mean0.shape[0]}, expected {int(mpl)}."
)
return mean0, cov0
# Vector path: split [mean..., cov...]
return ModelContainer._split_mean_cov_from_vector(p, mpl)
@staticmethod
def _bounds_from_param_obj(P):
"""Build (n_params, 2) array from Param.bounds (normalized)."""
any_bound = False
neg_inf = -float("inf")
pos_inf = float("inf")
out = []
for b in P.bounds:
if b is None:
out.append((neg_inf, pos_inf))
else:
any_bound = True
out.append((float(b[0]), float(b[1])))
return None if not any_bound else gnp.asarray(out, dtype=float)
@staticmethod
def _as_bounds_array(b, n_params):
if b is None:
return None
b = gnp.asarray(b, dtype=float)
if b.ndim != 2 or b.shape[1] != 2:
raise ValueError("bounds must have shape (n_params, 2).")
if b.shape[0] != n_params:
raise ValueError(f"bounds has {b.shape[0]} rows, expected {n_params}.")
return b
@staticmethod
def _extract_bounds_for_output(bounds_arg, i, n_params, output_dim=None):
"""
Extract bounds for output i and validate shape.
Parameters
----------
bounds_arg : None | array_like(n_params, 2) | list[array_like(n_params, 2)]
Either a single bounds array to broadcast to all outputs, or a list of
per-output bounds arrays.
i : int
Output index.
n_params : int
Expected number of parameters for output i.
output_dim : int | None
Required when bounds_arg is a list. If None, no length check is done.
Returns
-------
ndarray | None
Bounds array of shape (n_params, 2) or None.
"""
if bounds_arg is None:
return None
if isinstance(bounds_arg, list):
if output_dim is not None and len(bounds_arg) != int(output_dim):
raise ValueError(
"When bounds is a list, its length must equal output_dim."
)
return ModelContainer._as_bounds_array(bounds_arg[i], n_params)
return ModelContainer._as_bounds_array(bounds_arg, n_params)
@staticmethod
def _interval_around(p0, delta):
"""Return (n_params, 2) array [p0 - d, p0 + d]."""
p0 = gnp.asarray(p0, dtype=float).reshape(-1)
if gnp.isscalar(delta):
d = gnp.ones_like(p0) * float(delta)
else:
d = gnp.asarray(delta, dtype=float).reshape(-1)
if d.shape[0] != p0.shape[0]:
raise ValueError("bounds_delta length must match number of parameters.")
lo = p0 - d
hi = p0 + d
return gnp.stack([lo, hi], axis=1)
@staticmethod
def _intersect_bounds(a, b):
"""Intersect two (n_params, 2) arrays. If one is None, return the other."""
if a is None:
return b
if b is None:
return a
lo = gnp.maximum(a[:, 0], b[:, 0])
hi = gnp.minimum(a[:, 1], b[:, 1])
return gnp.stack([lo, hi], axis=1)
@staticmethod
def _apply_bounds_to_param(param_obj, bounds_array, mpl):
"""
Copy optimizer bounds (normalized space) into Param.bounds,
respecting the ordering [mean..., cov...].
"""
if bounds_array is None or param_obj is None:
return
# Collect indices in Param that correspond to mean then cov
idx_mean = []
idx_cov = []
if hasattr(param_obj, "indices_by_path_prefix"):
idx_mean = param_obj.indices_by_path_prefix(["meanparam"])
idx_cov = param_obj.indices_by_path_prefix(["covparam"])
else:
# Fallback: assume the Param was built as [mean..., cov...] in order
idx_mean = list(range(mpl))
idx_cov = list(range(mpl, mpl + (bounds_array.shape[0] - mpl)))
seq = idx_mean + idx_cov
if len(seq) != bounds_array.shape[0]:
# As a stricter fallback, map first mpl to mean and the rest to cov
seq = list(range(bounds_array.shape[0]))
# Ensure bounds list exists and is sized
if not hasattr(param_obj, "bounds") or len(param_obj.bounds) < len(seq):
return # nothing we can safely do
for pos, idx in enumerate(seq):
lo, hi = bounds_array[pos]
# Store as a tuple of floats; if unbounded, they will be +/- inf
param_obj.bounds[idx] = (float(lo), float(hi))
# ................................................................ check data
def _ensure_shapes_and_type(self, xi=None, zi=None, xt=None, convert=True):
"""
Validate shapes and (optionally) convert arrays to backend type.
Returns
-------
(xi, zi, xt) : arrays with correct shapes/types.
"""
if xi is not None:
xi = gnp.asarray(xi) if convert else xi
if getattr(xi, "ndim", None) != 2:
raise ValueError("xi must be a 2D array")
if zi is not None:
zi = gnp.asarray(zi) if convert else zi
if zi.ndim == 1:
if self.output_dim != 1:
raise ValueError("zi provided as 1D array, but output_dim != 1")
zi = zi.reshape(-1, 1)
else:
if zi.ndim != 2:
raise ValueError("zi must be a 1D or 2D array")
if zi.shape[1] != self.output_dim:
raise ValueError("zi must have output_dim columns")
if xt is not None:
xt = gnp.asarray(xt) if convert else xt
if getattr(xt, "ndim", None) != 2:
raise ValueError("xt must be a 2D array")
if xi is not None and zi is not None and xi.shape[0] != zi.shape[0]:
raise ValueError("Number of rows in xi must equal number of rows in zi")
if xi is not None and xt is not None and xi.shape[1] != xt.shape[1]:
raise ValueError("xi and xt must have the same number of columns")
return xi, zi, xt
# ==============================================================================
# Mean functions
# ==============================================================================
def mean_parameterized_constant(x, param):
return param * gnp.ones((x.shape[0], 1))
[docs]
def mean_linpred_constant(x, param):
"""Constant mean function for Gaussian Process models, linear predictor type.
Parameters
----------
x : ndarray(n, d)
Input data points in dimension d.
param : ndarray
Parameters of the mean function (unused in constant mean).
Returns
-------
ndarray
Array of ones with shape (n, 1).
"""
return gnp.ones((x.shape[0], 1))
[docs]
def mean_linpred_linear(x, param):
"""Linear mean function for Gaussian Process models, linear predictor type.
Parameters
----------
x : ndarray(n, d)
Input data points in dimension d.
param : ndarray
Parameters of the mean function (unused in linear mean).
Returns
-------
ndarray
Matrix [1, x_[1,1], ..., x_[1, d]
1, x_[2,1], ..., x_[2, d]
...
1, x_[n,1], ..., x_[n, d]] ]
"""
return gnp.hstack((gnp.ones((x.shape[0], 1)), gnp.asarray(x)))