Skip to content

kriging_ny

This is the Kriging surrogate model. It is based on the DACE matlab toolbox. It can handle numerical and categorical variables.

Kriging

Kriging class with optional Nyström approximation for scalability. This class implements the Kriging surrogate model, also known as Gaussian Process regression. It is adapted to handle both numerical (ordered) and categorical (factor) variables, a key feature of spotpython. The Nyström approximation is added as an optional feature to handle large datasets efficiently.

Source code in spotpython/surrogate/kriging_ny.py
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
class Kriging:
    """
    Kriging class with optional Nyström approximation for scalability.
    This class implements the Kriging surrogate model, also known as
    Gaussian Process regression. It is adapted to handle both numerical
    (ordered) and categorical (factor) variables, a key feature of spotpython.
    The Nyström approximation is added as an optional feature to handle
    large datasets efficiently.
    """

    def __init__(self, fun_control, n_theta=None, theta=None, p=2.0, corr="squared_exponential", isotropic=False, approximation="None", n_landmarks=100):
        """
        Initialize the Kriging model.

        Args:
            fun_control (dict): Control dictionary from spotpython, containing
                                problem dimensions, variable types ('var_type'), etc.
            n_theta (int, optional): Number of correlation parameters (theta).
                                     Defaults to problem dimension for anisotropic model.
            theta (np.ndarray, optional): Initial correlation parameters.
                                          Defaults to 0.1 for all dimensions.
            p (float, optional): Power for the correlation function. Defaults to 2.0.
            corr (str, optional): Correlation function type.
                                  Defaults to "squared_exponential".
            isotropic (bool, optional): Whether to use an isotropic model (one theta
                                        for all dimensions). Defaults to False.
            approximation (str, optional): Type of approximation to use.
                                           "None" for standard Kriging,
                                           "nystroem" for Nyström approximation.
                                           Defaults to "None".
            n_landmarks (int, optional): Number of landmark points for Nyström.
                                         Only used if approximation="nystroem".
                                         Defaults to 100.
        """
        self.fun_control = fun_control
        self.dim = self.fun_control["lower"].shape
        self.p = p
        self.corr = corr
        self.isotropic = isotropic
        self.approximation = approximation
        self.n_landmarks = n_landmarks

        # Setup masks for variable types
        self.factor_mask = self.fun_control["var_type"] == "factor"
        self.ordered_mask = ~self.factor_mask

        # Determine number of theta parameters
        if self.isotropic:
            self.n_theta = 1
        elif n_theta is None:
            self.n_theta = self.dim
        else:
            self.n_theta = n_theta

        # Initialize theta
        if theta is None:
            self.theta = np.full(self.n_theta, 0.1)
        else:
            self.theta = theta

        # Model state attributes
        self.X_ = None
        self.y_ = None
        self.L_ = None  # Cholesky factor for standard Kriging
        self.alpha_ = None  # Solved term for standard Kriging

        # Nyström-specific attributes
        self.landmarks_ = None
        self.W_cho_ = None  # Cholesky factor of W matrix
        self.nystrom_alpha_ = None  # Solved term for Nyström prediction

    def fit(self, X, y):
        """
        Fit the Kriging model to the training data.

        Args:
            X (np.ndarray): Training data of shape (n_samples, n_features).
            y (np.ndarray): Target values of shape (n_samples,).
        """
        self.X_ = X
        self.y_ = y
        n_samples = X.shape

        if self.approximation.lower() == "nystroem":
            if n_samples <= self.n_landmarks:
                # Fallback to standard Kriging if not enough samples
                self._fit_standard(X, y)
            else:
                self._fit_nystrom(X, y)
        else:
            self._fit_standard(X, y)

    def _fit_standard(self, X, y):
        """Standard Kriging fitting procedure."""
        # Build the full covariance matrix Psi
        Psi = self.build_Psi(X, X)
        Psi[np.diag_indices_from(Psi)] += 1e-8  # Add jitter for stability

        try:
            # Compute Cholesky decomposition
            self.L_ = cholesky(Psi, lower=True)
            # Solve for alpha = L'\(L\y)
            self.alpha_ = cho_solve((self.L_, True), y)
        except np.linalg.LinAlgError:
            # Fallback to pseudo-inverse if Cholesky fails
            pi_Psi = np.linalg.pinv(Psi)
            self.alpha_ = np.dot(pi_Psi, y)
            self.L_ = None  # Indicate that Cholesky failed

    def _fit_nystrom(self, X, y):
        """Nyström approximation fitting procedure."""
        n_samples = X.shape

        # 1. Select landmark points using uniform random sampling without replacement
        landmark_indices = np.random.choice(n_samples, self.n_landmarks, replace=False)
        self.landmarks_ = X[landmark_indices, :]

        # 2. Construct core matrices using build_Psi
        # W = K_mm (landmark-landmark covariance)
        W = self.build_Psi(self.landmarks_, self.landmarks_)
        W += 1e-8  # Add jitter

        # C = K_nm (data-landmark cross-covariance)
        C = self.build_Psi(X, self.landmarks_)

        # 3. Compute Cholesky decomposition of W
        try:
            self.W_cho_ = cholesky(W, lower=True)
        except np.linalg.LinAlgError:
            self.W_cho_ = None
            # Fallback to standard Kriging as a safe option
            self._fit_standard(X, y)
            return

        # 4. Pre-compute terms for prediction
        # Solve for nystrom_alpha = W_inv * C.T * y
        Ct_y = C.T @ y
        self.nystrom_alpha_ = cho_solve((self.W_cho_, True), Ct_y)

    def predict(self, X_star):
        """
        Make predictions with the fitted Kriging model.

        Args:
            X_star (np.ndarray): Test data of shape (n_test_samples, n_features).

        Returns:
            tuple: A tuple containing:
                - y_pred (np.ndarray): Predicted mean values.
                - y_mse (np.ndarray): Mean squared error (predictive variance).
        """
        if self.approximation.lower() == "nystroem" and self.landmarks_ is not None:
            return self._predict_nystrom(X_star)
        else:
            return self._predict_standard(X_star)

    def _predict_standard(self, X_star):
        """Standard Kriging prediction procedure."""
        # Build cross-covariance vector/matrix psi
        psi = self.build_Psi(X_star, self.X_)

        # Predictive mean
        y_pred = psi @ self.alpha_

        # Predictive variance
        if self.L_ is not None:
            v = solve_triangular(self.L_, psi.T, lower=True)
            y_mse = 1.0 - np.sum(v**2, axis=0)
            y_mse[y_mse < 0] = 0
        else:
            pi_Psi = np.linalg.pinv(self.build_Psi(self.X_, self.X_) + 1e-8 * np.eye(self.X_.shape))
            y_mse = 1.0 - np.sum((psi @ pi_Psi) * psi, axis=1)
            y_mse[y_mse < 0] = 0

        return y_pred, y_mse.reshape(-1, 1)

    def _predict_nystrom(self, X_star):
        """Nyström approximation prediction procedure."""
        # 1. Compute cross-covariance between test points and landmarks
        psi_star_m = self.build_Psi(X_star, self.landmarks_)

        # 2. Predictive mean
        y_pred = psi_star_m @ self.nystrom_alpha_

        # 3. Predictive variance
        if self.W_cho_ is not None:
            v = cho_solve((self.W_cho_, True), psi_star_m.T)
            quad_term = np.sum(psi_star_m * v.T, axis=1)
            y_mse = 1.0 - quad_term
            y_mse[y_mse < 0] = 0
        else:
            y_mse = np.ones(X_star.shape)  # Return max uncertainty

        return y_pred, y_mse.reshape(-1, 1)

    def build_Psi(self, X1, X2):
        """Builds the covariance matrix Psi between two sets of points."""
        n1 = X1.shape
        Psi = np.zeros((n1, X2.shape))
        for i in range(n1):
            Psi[i, :] = self.build_psi_vec(X1[i, :], X2)
        return Psi

    def build_psi_vec(self, x, X_):
        """
        Builds a covariance vector between a point x and a set of points X_.
        This method correctly handles mixed (ordered/factor) variable types.
        """
        # Handle theta for isotropic vs. anisotropic cases
        if self.isotropic:
            theta10 = np.full(self.dim, 10**self.theta)
        else:
            theta10 = 10**self.theta

        D = np.zeros(X_.shape)

        # Compute ordered distance contributions
        if self.ordered_mask.any():
            X_ordered = X_[:, self.ordered_mask]
            x_ordered = x[self.ordered_mask]
            D += cdist(x_ordered.reshape(1, -1), X_ordered, metric="sqeuclidean", w=theta10[self.ordered_mask]).ravel()

        # Compute factor distance contributions
        if self.factor_mask.any():
            X_factor = X_[:, self.factor_mask]
            x_factor = x[self.factor_mask]
            # Hamming distance for factors
            D += cdist(x_factor.reshape(1, -1), X_factor, metric="hamming", w=theta10[self.factor_mask]).ravel() * self.factor_mask.sum()

        # Apply correlation function
        if self.corr == "squared_exponential":
            psi = np.exp(-D)
        else:
            # Fallback for other potential correlation functions
            psi = np.exp(-(D**self.p))

        return psi

__init__(fun_control, n_theta=None, theta=None, p=2.0, corr='squared_exponential', isotropic=False, approximation='None', n_landmarks=100)

Initialize the Kriging model.

Parameters:

Name Type Description Default
fun_control dict

Control dictionary from spotpython, containing problem dimensions, variable types (‘var_type’), etc.

required
n_theta int

Number of correlation parameters (theta). Defaults to problem dimension for anisotropic model.

None
theta ndarray

Initial correlation parameters. Defaults to 0.1 for all dimensions.

None
p float

Power for the correlation function. Defaults to 2.0.

2.0
corr str

Correlation function type. Defaults to “squared_exponential”.

'squared_exponential'
isotropic bool

Whether to use an isotropic model (one theta for all dimensions). Defaults to False.

False
approximation str

Type of approximation to use. “None” for standard Kriging, “nystroem” for Nyström approximation. Defaults to “None”.

'None'
n_landmarks int

Number of landmark points for Nyström. Only used if approximation=”nystroem”. Defaults to 100.

100
Source code in spotpython/surrogate/kriging_ny.py
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
def __init__(self, fun_control, n_theta=None, theta=None, p=2.0, corr="squared_exponential", isotropic=False, approximation="None", n_landmarks=100):
    """
    Initialize the Kriging model.

    Args:
        fun_control (dict): Control dictionary from spotpython, containing
                            problem dimensions, variable types ('var_type'), etc.
        n_theta (int, optional): Number of correlation parameters (theta).
                                 Defaults to problem dimension for anisotropic model.
        theta (np.ndarray, optional): Initial correlation parameters.
                                      Defaults to 0.1 for all dimensions.
        p (float, optional): Power for the correlation function. Defaults to 2.0.
        corr (str, optional): Correlation function type.
                              Defaults to "squared_exponential".
        isotropic (bool, optional): Whether to use an isotropic model (one theta
                                    for all dimensions). Defaults to False.
        approximation (str, optional): Type of approximation to use.
                                       "None" for standard Kriging,
                                       "nystroem" for Nyström approximation.
                                       Defaults to "None".
        n_landmarks (int, optional): Number of landmark points for Nyström.
                                     Only used if approximation="nystroem".
                                     Defaults to 100.
    """
    self.fun_control = fun_control
    self.dim = self.fun_control["lower"].shape
    self.p = p
    self.corr = corr
    self.isotropic = isotropic
    self.approximation = approximation
    self.n_landmarks = n_landmarks

    # Setup masks for variable types
    self.factor_mask = self.fun_control["var_type"] == "factor"
    self.ordered_mask = ~self.factor_mask

    # Determine number of theta parameters
    if self.isotropic:
        self.n_theta = 1
    elif n_theta is None:
        self.n_theta = self.dim
    else:
        self.n_theta = n_theta

    # Initialize theta
    if theta is None:
        self.theta = np.full(self.n_theta, 0.1)
    else:
        self.theta = theta

    # Model state attributes
    self.X_ = None
    self.y_ = None
    self.L_ = None  # Cholesky factor for standard Kriging
    self.alpha_ = None  # Solved term for standard Kriging

    # Nyström-specific attributes
    self.landmarks_ = None
    self.W_cho_ = None  # Cholesky factor of W matrix
    self.nystrom_alpha_ = None  # Solved term for Nyström prediction

build_Psi(X1, X2)

Builds the covariance matrix Psi between two sets of points.

Source code in spotpython/surrogate/kriging_ny.py
207
208
209
210
211
212
213
def build_Psi(self, X1, X2):
    """Builds the covariance matrix Psi between two sets of points."""
    n1 = X1.shape
    Psi = np.zeros((n1, X2.shape))
    for i in range(n1):
        Psi[i, :] = self.build_psi_vec(X1[i, :], X2)
    return Psi

build_psi_vec(x, X_)

Builds a covariance vector between a point x and a set of points X_. This method correctly handles mixed (ordered/factor) variable types.

Source code in spotpython/surrogate/kriging_ny.py
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
def build_psi_vec(self, x, X_):
    """
    Builds a covariance vector between a point x and a set of points X_.
    This method correctly handles mixed (ordered/factor) variable types.
    """
    # Handle theta for isotropic vs. anisotropic cases
    if self.isotropic:
        theta10 = np.full(self.dim, 10**self.theta)
    else:
        theta10 = 10**self.theta

    D = np.zeros(X_.shape)

    # Compute ordered distance contributions
    if self.ordered_mask.any():
        X_ordered = X_[:, self.ordered_mask]
        x_ordered = x[self.ordered_mask]
        D += cdist(x_ordered.reshape(1, -1), X_ordered, metric="sqeuclidean", w=theta10[self.ordered_mask]).ravel()

    # Compute factor distance contributions
    if self.factor_mask.any():
        X_factor = X_[:, self.factor_mask]
        x_factor = x[self.factor_mask]
        # Hamming distance for factors
        D += cdist(x_factor.reshape(1, -1), X_factor, metric="hamming", w=theta10[self.factor_mask]).ravel() * self.factor_mask.sum()

    # Apply correlation function
    if self.corr == "squared_exponential":
        psi = np.exp(-D)
    else:
        # Fallback for other potential correlation functions
        psi = np.exp(-(D**self.p))

    return psi

fit(X, y)

Fit the Kriging model to the training data.

Parameters:

Name Type Description Default
X ndarray

Training data of shape (n_samples, n_features).

required
y ndarray

Target values of shape (n_samples,).

required
Source code in spotpython/surrogate/kriging_ny.py
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
def fit(self, X, y):
    """
    Fit the Kriging model to the training data.

    Args:
        X (np.ndarray): Training data of shape (n_samples, n_features).
        y (np.ndarray): Target values of shape (n_samples,).
    """
    self.X_ = X
    self.y_ = y
    n_samples = X.shape

    if self.approximation.lower() == "nystroem":
        if n_samples <= self.n_landmarks:
            # Fallback to standard Kriging if not enough samples
            self._fit_standard(X, y)
        else:
            self._fit_nystrom(X, y)
    else:
        self._fit_standard(X, y)

predict(X_star)

Make predictions with the fitted Kriging model.

Parameters:

Name Type Description Default
X_star ndarray

Test data of shape (n_test_samples, n_features).

required

Returns:

Name Type Description
tuple

A tuple containing: - y_pred (np.ndarray): Predicted mean values. - y_mse (np.ndarray): Mean squared error (predictive variance).

Source code in spotpython/surrogate/kriging_ny.py
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
def predict(self, X_star):
    """
    Make predictions with the fitted Kriging model.

    Args:
        X_star (np.ndarray): Test data of shape (n_test_samples, n_features).

    Returns:
        tuple: A tuple containing:
            - y_pred (np.ndarray): Predicted mean values.
            - y_mse (np.ndarray): Mean squared error (predictive variance).
    """
    if self.approximation.lower() == "nystroem" and self.landmarks_ is not None:
        return self._predict_nystrom(X_star)
    else:
        return self._predict_standard(X_star)