# --------------------------------------------------------------
# Authors: Emmanuel Vazquez <emmanuel.vazquez@centralesupelec.fr>
# Sébastien Petit
# Copyright (c) 2023, CentraleSupelec
# License: GPLv3 (see LICENSE)
# --------------------------------------------------------------
"""
Relaxed Gaussian process (reGP) utilities for threshold-oriented modeling.
This module implements the reGP procedure used in `examples/example20_regp.py`:
start from an initial GP model, relax observations in a target interval, and
update the covariance parameters jointly with relaxed values to improve
behavior in a region of interest.
Typical use
-----------
1. Define a relaxation interval `R` (for example `[[u, +inf]]`).
2. Call `predict(...)` (or `remodel(...)`) to obtain relaxed observations and
posterior predictions.
3. Optionally call `select_optimal_threshold_above_t0(...)` to choose a
threshold above `t0` with a tCRPS-based criterion.
Defines
-------
get_membership_indices
Assign each observation to a relaxation interval index.
split_data
Split observations into non-relaxed and relaxed subsets.
make_regp_criterion_with_gradient
Build the reGP objective and gradient wrappers for optimization.
remodel
Perform reGP parameter/value optimization (REML + relaxation).
predict
Run reGP remodeling then compute posterior predictions.
select_optimal_threshold_above_t0
Select a relaxation threshold above `t0` by minimizing a tCRPS criterion.
Reference
---------
S. J. Petit, J. Bect, and E. Vazquez (2025).
"Relaxed Gaussian Process Interpolation: a Goal-Oriented Approach to Bayesian Optimization."
Journal of Machine Learning Research, 26(195):1-70.
https://www.jmlr.org/papers/v26/22-0828.html
"""
from scipy.stats import norm
import time
import numpy as np
import gpmp.num as gnp
import gpmp as gp
[docs]
def get_membership_indices(zi, R):
"""
Get the index of the membership of each row in zi to an interval in R.
Parameters
----------
zi : ndarray
An (n, 1) shaped array of data points.
R : list of lists
A list of intervals represented as [l_i, u_i] pairs.
Returns
-------
ei : ndarray
An (n, 1) shaped array of membership indices.
"""
n = zi.shape[0]
ei = np.zeros(n, dtype=int)
for idx, (l, u) in enumerate(R, start=1):
mask = np.logical_and(zi > l, zi <= u)
c = idx * mask
ei = ei + c
return ei
[docs]
def split_data(xi, zi, ei, R):
"""Split the data based on the membership indices.
Parameters
----------
xi : ndarray
An (n, d) shaped array of data points.
zi : ndarray
An (n, 1) shaped array of data points.
ei : ndarray
An (n, 1) shaped array of membership indices.
R : list of intervals
List of intervals, each specified as [l_k, u_k].
Returns
-------
(x0, z0, ind0) : tuple of ndarrays
A tuple of (n0, d), (n0, 1) shaped arrays for xi and zi rows
with ei = 0, and the corresponding indices.
(x1, z1, bounds, ind1) : tuple of ndarrays
Rows of xi and zi with ei > 0, and the corresponding interval
bounds and indices.
"""
mask = ei.reshape(-1) == 0
ind0 = np.where(mask)[0]
ind1 = np.where(~mask)[0]
x0, z0 = xi[mask], zi[mask]
x1, z1, e1 = xi[~mask], zi[~mask], ei[~mask]
interval_indices = e1 - 1
bounds = [R[i] for i in interval_indices]
return (x0, z0, ind0), (x1, z1, bounds, ind1)
[docs]
def make_regp_criterion_with_gradient(model, x0, z0, x1):
"""
Make regp criterion function with gradient.
Parameters
----------
model : gpmp model
Gaussian process model.
x0 : ndarray, shape (n0, d)
Locations of the observed data points not relaxed
z0 : ndarray, shape (n0,)
Observed values at the data points not relaxed
x1 : ndarray, shape (n1, d)
Locations of the relaxed data points
Returns
-------
crit_pre_grad : callable
Criterion wrapper to evaluate objective and cache graph before gradient.
dcrit : callable
Gradient function compatible with SciPy's jac callback.
"""
x0 = gnp.asarray(x0)
x1 = gnp.asarray(x1)
z0 = gnp.asarray(z0)
xi = gnp.vstack((x0, x1))
n1 = x1.shape[0]
# selection criterion
selection_criterion = model.negative_log_restricted_likelihood
def crit_(param):
if n1 > 0:
covparam = param[0:-n1]
z1 = param[-n1:]
elif n1 == 0:
covparam = param
z1 = gnp.array([])
else:
raise ValueError(n1)
zi = gnp.concatenate((z0, z1))
l = selection_criterion(covparam, xi, zi)
return l
crit_diff = gnp.DifferentiableSelectionCriterion(
lambda p, _x, _z: crit_(p),
gnp.array([]),
gnp.array([]),
)
return crit_diff.evaluate_pre_grad, crit_diff.gradient
[docs]
def remodel(
model,
xi,
zi,
R,
covparam0=None,
info=False,
verbosity=0,
convert_in=True,
convert_out=True,
):
"""
Perform reGP optimization (REML + relaxation)
Parameters
----------
model : GPModel
Gaussian process model.
xi : ndarray, shape (n, d)
Locations of the observed data points.
zi : ndarray, shape (n,)
Observed values at the data points.
R : list of intervals
List of relaxation intervals, each specified as [l_k, u_k].
covparam0 : ndarray, optional
Initial guess for the covariance parameters.
info : bool, optional
Whether to return additional information.
verbosity : int, optional
Verbosity level.
convert_in : bool, optional
Whether to convert input arrays to _gpmp_backend_ type or keep as-is.
convert_out : bool, optional
Whether to return numpy arrays or keep _gpmp_backend_ types.
Returns
-------
model : GPmp model
Updated GPmp Gaussian process model.
zi_relaxed : ndarray, shape (n,)
Relaxed output data.
ind_relaxed : ndarray
Indices of the relaxed data points in the input zi array.
info_ret : dict, optional
Additional information (if info=True).
"""
tic = time.time()
xi_, zi_, _ = gp.core.utils.ensure_shapes_and_type(
xi=xi, zi=zi, convert=convert_in
)
# Initial guess for the covariance parameters if not provided
if covparam0 is None:
covparam0 = gp.kernel.anisotropic_parameters_initial_guess(
model, xi_, zi_)
covparam_dim = covparam0.shape[0]
covparam_bounds = [gnp.array([-gnp.inf, gnp.inf])] * covparam0.shape[0]
# Membership indices and split data
ei = get_membership_indices(gnp.to_np(zi_), R)
(x0, z0, ind0), (x1, z1, z1_bounds, ind1) = split_data(xi_, zi_, ei, R)
z1_size = z1.shape[0]
# Initial parameter vector and bounds
p0 = np.concatenate((covparam0, z1.reshape(-1)))
bounds = covparam_bounds + z1_bounds
# reGP criterion
nlrl, dnlrl = make_regp_criterion_with_gradient(model, x0, z0, x1)
# Verbosity level
silent = True
if verbosity == 1:
print("reGP stuff...")
elif verbosity == 2:
silent = False
# Optimize parameters
popt, info_ret = gp.kernel.autoselect_parameters(
p0, nlrl, dnlrl, bounds=bounds, silent=silent, info=True
)
if verbosity == 1:
print("done.")
# Update the model and relaxed data
model.covparam = gnp.asarray(popt[0:covparam_dim])
z1_relaxed = popt[covparam_dim:]
zi_relaxed = gnp.zeros(zi_.shape)
if gnp._gpmp_backend_ == "jax":
zi_relaxed = zi_relaxed.at[ind0].set(gnp.asarray(z0))
zi_relaxed = zi_relaxed.at[ind1].set(gnp.asarray(z1_relaxed))
else:
zi_relaxed[ind0] = gnp.asarray(z0)
zi_relaxed[ind1] = gnp.asarray(z1_relaxed)
# Return results
if info:
info_ret["covparam0"] = covparam0
info_ret["covparam"] = model.covparam
info_ret["selection_criterion"] = nlrl
info_ret["time"] = time.time() - tic
return model, zi_relaxed, ind1, info_ret
else:
return model, zi_relaxed, ind1
[docs]
def predict(model, xi, zi, xt, R, covparam0=None, info=False, verbosity=0):
"""
Perform reGP optimization (REML + relaxation) and prediction
Parameters
----------
model : GPModel
Gaussian process model.
xi : ndarray, shape (n, d)
Locations of the observed data points.
zi : ndarray, shape (n,)
Observed values at the data points.
xt : ndarray, shape (n, d)
Locations of points to be predicted.
R : list of intervals
List of relaxation intervals, each specified as [l_k, u_k].
covparam0 : ndarray, optional
Initial guess for the covariance parameters.
info : bool, optional
Whether to return additional information.
verbosity : int, optional
Verbosity level.
Returns
-------
zi_relaxed : ndarray, shape (n,)
Relaxed output data.
(zpm, zpv) : tuple of ndarrays
Relaxed posterior mean and variance.
model : GPmp model
Updated GPmp Gaussian process model.
info_ret : dict, optional
Additional information (if info=True).
"""
if info is True:
model, zi_relaxed, ind_relaxed, info_ret = remodel(
model, xi, zi, R, covparam0=None, info=info, verbosity=verbosity
)
else:
model, zi_relaxed, ind_relaxed = remodel(
model, xi, zi, R, covparam0=None, info=info, verbosity=verbosity
)
info_ret = None
zpm, zpv = model.predict(xi, zi_relaxed, xt)
return zi_relaxed, (zpm, zpv), model, info_ret
[docs]
def select_optimal_threshold_above_t0(model, xi, zi, t0, G=20):
"""
Choose threshold for reGP with relaxation above t0
This function selects an optimal threshold for a reGP above t0 by
minimizing the truncated continuous ranked probability score
(tCRPS) over a range of possible thresholds.
Parameters
----------
model : GPModel
Gaussian process model.
xi : ndarray, shape (n, d)
Locations of the observed data points.
zi : ndarray, shape (n,)
Observed values at the data points.
t0 : float
Lower limit of the threshold range.
G : int, optional, default: 20
Number of candidate thresholds to evaluate.
Returns
-------
Rgopt : ndarray, shape (1, 2)
Optimal relaxation interval, specified as [threshold, inf].
"""
t = (
gnp.logspace(gnp.log10(t0 - zi.min()),
gnp.log10(gnp.max(zi) - zi.min()), G)
+ zi.min()
)
J = gnp.numpy.zeros(G)
for g in range(G):
Rg = gnp.numpy.array([[t[g], gnp.numpy.inf]])
model, zi_relaxed, _ = remodel(model, xi, zi, Rg)
zloom, zloov, _ = model.loo(xi, zi_relaxed)
tCRPS = gp.misc.scoringrules.tcrps_gaussian(
zloom, gnp.sqrt(zloov), zi_relaxed, a=-gnp.inf, b=t0
)
J[g] = gnp.sum(tCRPS)
gopt = gnp.argmin(gnp.asarray(J))
Rgopt = gnp.numpy.array([[t[gopt], gnp.numpy.inf]])
return Rgopt
# ---------------------------------------