"""
Kriging surrogate with optional Nyström approximation.
"""
import numpy as np
from scipy.spatial.distance import cdist
from scipy.linalg import cholesky, cho_solve, solve_triangular
class Kriging:
def __init__(self, fun_control, n_theta=None, theta=None, p=2.0,
corr="squared_exponential", isotropic=False,
approximation="None", n_landmarks=100):
self.fun_control = fun_control
self.dim = self.fun_control["lower"].shape
self.p = p
self.corr = corr
self.isotropic = isotropic
self.approximation = approximation
self.n_landmarks = n_landmarks
self.factor_mask = self.fun_control["var_type"] == "factor"
self.ordered_mask = ~self.factor_mask
self.n_theta = 1 if isotropic else (n_theta or self.dim)
self.theta = np.full(self.n_theta, 0.1) if theta is None else theta
self.X_, self.y_, self.L_, self.alpha_ = None, None, None, None
self.landmarks_, self.W_cho_, self.nystrom_alpha_ = None, None, None
def fit(self, X, y):
self.X_, self.y_ = X, y
n_samples = X.shape[0]
if self.approximation.lower() == "nystroem" and n_samples > self.n_landmarks:
return self._fit_nystrom(X, y)
return self._fit_standard(X, y)
def _fit_standard(self, X, y):
Psi = self.build_Psi(X, X)
Psi[np.diag_indices_from(Psi)] += 1e-8
try:
self.L_ = cholesky(Psi, lower=True)
self.alpha_ = cho_solve((self.L_, True), y)
except np.linalg.LinAlgError:
self.L_ = None
self.alpha_ = np.linalg.pinv(Psi) @ y
def _fit_nystrom(self, X, y):
n_samples = X.shape[0]
idx = np.random.choice(n_samples, self.n_landmarks, replace=False)
self.landmarks_ = X[idx, :]
W = self.build_Psi(self.landmarks_, self.landmarks_) + 1e-8 * np.eye(self.n_landmarks)
C = self.build_Psi(X, self.landmarks_)
try:
self.W_cho_ = cholesky(W, lower=True)
self.nystrom_alpha_ = cho_solve((self.W_cho_, True), C.T @ y)
except np.linalg.LinAlgError:
self.W_cho_ = None
self._fit_standard(X, y)
def predict(self, X_star):
if self.approximation.lower() == "nystroem" and self.landmarks_ is not None:
return self._predict_nystrom(X_star)
return self._predict_standard(X_star)
def _predict_standard(self, X_star):
psi = self.build_Psi(X_star, self.X_)
y_pred = psi @ self.alpha_
if self.L_ is not None:
v = solve_triangular(self.L_, psi.T, lower=True)
y_mse = 1.0 - np.sum(v**2, axis=0)
else:
Psi = self.build_Psi(self.X_, self.X_) + 1e-8 * np.eye(self.X_.shape[0])
pi_Psi = np.linalg.pinv(Psi)
y_mse = 1.0 - np.sum((psi @ pi_Psi) * psi, axis=1)
y_mse[y_mse < 0] = 0
return y_pred, y_mse.reshape(-1, 1)
def _predict_nystrom(self, X_star):
psi_star_m = self.build_Psi(X_star, self.landmarks_)
y_pred = psi_star_m @ self.nystrom_alpha_
if self.W_cho_ is not None:
v = cho_solve((self.W_cho_, True), psi_star_m.T)
quad = np.sum(psi_star_m * v.T, axis=1)
y_mse = 1.0 - quad
else:
y_mse = np.ones(X_star.shape[0])
y_mse[y_mse < 0] = 0
return y_pred, y_mse.reshape(-1, 1)
def build_Psi(self, X1, X2):
n1 = X1.shape[0]
Psi = np.zeros((n1, X2.shape[0]))
for i in range(n1):
Psi[i, :] = self.build_psi_vec(X1[i, :], X2)
return Psi
def build_psi_vec(self, x, X_):
theta10 = np.full(self.dim, 10**self.theta) if self.isotropic else 10**self.theta
D = np.zeros(X_.shape[0])
if self.ordered_mask.any():
Xo = X_[:, self.ordered_mask]
xo = x[self.ordered_mask]
D += cdist(xo.reshape(1, -1), Xo, metric="sqeuclidean",
w=theta10[self.ordered_mask]).ravel()
if self.factor_mask.any():
Xf = X_[:, self.factor_mask]
xf = x[self.factor_mask]
D += cdist(xf.reshape(1, -1), Xf, metric="hamming",
w=theta10[self.factor_mask]).ravel() * self.factor_mask.sum()
return np.exp(-D) if self.corr == "squared_exponential" else np.exp(-(D**self.p))