import numpy as np
from scipy.spatial.distance import pdist, squareform, cdist
from numpy.linalg import cholesky, solve, LinAlgError
from numpy import spacing, sqrt, exp, multiply, eye, ones
from scipy.optimize import minimize
from scipy.stats import qmc
import matplotlib.pyplot as plt
import numpy as np
import matplotlib.pyplot as plt
from numpy import (exp, multiply, eye, linspace, spacing, sqrt)
from numpy.linalg import cholesky, solve
from scipy.spatial.distance import squareform, pdist, cdist
from scipy.optimize import minimize # Für die Optimierung
# 1. Definition der KrigingRegressor-Klasse
class KrigingRegressor:
"""Ein Kriging-Regressionsmodell mit Hyperparameter-Optimierung.
Attributes:
initial_theta (float): Startwert für den Aktivitätsparameter Theta.
bounds (list): Liste von Tupeln für die Grenzen der Hyperparameter-Optimierung.
opt_theta_ (float): Optimierter Theta-Wert nach dem Fitting.
X_train_ (array): Trainings-Eingabedaten.
y_train_ (array): Trainings-Zielwerte.
U_ (array): Cholesky-Zerlegung der Korrelationsmatrix.
mu_hat_ (float): Geschätzter Mittelwert.
"""
def __init__(self, initial_theta=1.0, bounds=[(0.001, 100.0)]):
self.initial_theta = initial_theta
self.bounds = bounds
def _build_Psi(self, X, log_theta, eps=sqrt(spacing(1))):
"""Berechnet die Korrelationsmatrix Psi.
Args:
X (np.ndarray): Eingabedaten-Matrix
log_theta (float or np.ndarray): Log10-transformierte Theta-Werte
eps (float, optional): Nugget-Term für numerische Stabilität
Returns:
np.ndarray: Korrelationsmatrix Psi
"""
# Konvertiere log_theta zurück zu theta (immer positiv durch exp)
= 10.0**log_theta
theta
if not isinstance(theta, np.ndarray) or theta.ndim == 0:
= np.array([theta])
theta
= squareform(pdist(X, metric='sqeuclidean', w=theta))
D = exp(-D)
Psi += multiply(eye(X.shape[0]), eps)
Psi return Psi
def _build_psi(self, X_train, x_predict, log_theta):
"""Berechnet den Korrelationsvektor psi.
Args:
X_train (np.ndarray): Trainings-Eingabedaten
x_predict (np.ndarray): Vorhersage-Eingabepunkte
log_theta (float or np.ndarray): Log10-transformierte Theta-Werte
Returns:
np.ndarray: Korrelationsvektor psi
"""
# Konvertiere log_theta zurück zu theta
= 10.0**log_theta
theta
if not isinstance(theta, np.ndarray) or theta.ndim == 0:
= np.array([theta])
theta
= cdist(x_predict, X_train, metric='sqeuclidean', w=theta)
D = exp(-D)
psi return psi.T
def _neg_log_likelihood(self, params, X_train, y_train):
"""Berechnet die negative konzentrierte Log-Likelihood."""
= params
theta = X_train.shape[0]
n
try:
= self._build_Psi(X_train, theta)
Psi = cholesky(Psi).T
U except np.linalg.LinAlgError:
return 1e15
= np.ones(n).reshape(-1, 1)
one
# Berechne mu_hat (MLE des Mittelwerts)
= solve(U, solve(U.T, y_train))
Psi_inv_y = solve(U, solve(U.T, one))
Psi_inv_one = (one.T @ Psi_inv_y) / (one.T @ Psi_inv_one)
mu_hat = mu_hat.item()
mu_hat
# Berechne sigma_hat_sq (MLE der Prozessvarianz)
= y_train - one * mu_hat
y_minus_mu_one = (y_minus_mu_one.T @ Psi_inv_y) / n
sigma_hat_sq = sigma_hat_sq.item()
sigma_hat_sq
if sigma_hat_sq < 1e-10:
return 1e15
# Berechne negative konzentrierte Log-Likelihood
= 2 * np.sum(np.log(np.diag(U.T)))
log_det_Psi = 0.5 * (n * np.log(sigma_hat_sq) + log_det_Psi)
nll
return nll
def fit(self, X, y):
"""Fit das Kriging-Modell an die Trainingsdaten.
// ...existing docstring...
"""
self.X_train_ = X
self.y_train_ = y.reshape(-1, 1)
# Optimierung der Hyperparameter
= np.array([self.initial_theta])
initial_theta = minimize(self._neg_log_likelihood,
result
initial_theta,=(self.X_train_, self.y_train_),
args='L-BFGS-B',
method=self.bounds)
bounds
self.opt_theta_ = result.x
# Berechne optimale Parameter für Vorhersagen
= self._build_Psi(self.X_train_, self.opt_theta_)
Psi_opt self.U_ = cholesky(Psi_opt).T
= self.X_train_.shape[0]
n_train = np.ones(n_train).reshape(-1, 1)
one
self.mu_hat_ = (one.T @ solve(self.U_, solve(self.U_.T, self.y_train_))) / \
@ solve(self.U_, solve(self.U_.T, one)))
(one.T self.mu_hat_ = self.mu_hat_.item()
return self
def predict(self, X):
"""Vorhersage für neue Datenpunkte.
Args:
X (array-like): Eingabedaten für Vorhersage der Form (n_samples, n_features)
Returns:
array: Vorhergesagte Werte
"""
= self.X_train_.shape[0]
n_train = np.ones(n_train).reshape(-1, 1)
one # Fix: Use self._build_psi instead of build_psi
= self._build_psi(self.X_train_, X, self.opt_theta_)
psi
return self.mu_hat_ * np.ones(X.shape[0]).reshape(-1, 1) + \
@ solve(self.U_, solve(self.U_.T,
psi.T self.y_train_ - one * self.mu_hat_))
63 Lernmodul: Kriging Projekt
Um die gestellte Aufgabe zu lösen, erstellen wir eine Python-Klasse KrigingRegressor
für das Kriging-Modell, eine Black-Box-Funktion f(x)
, eine Funktion zur Erstellung des initialen Stichprobenplans und implementieren dann den sequenziellen Optimierungsablauf.
1. Die KrigingRegressor
-Klasse Diese Klasse kapselt die Logik für das Kriging-Modell, einschließlich der Berechnung der Korrelationsmatrizen, der Maximum-Likelihood-Schätzung für den globalen Mittelwert \(\hat{\mu}\) und die Prozessvarianz \(\hat{\sigma}^2\), der konzentrierten Log-Likelihood-Funktion zur Hyperparameter-Optimierung sowie der Vorhersagefunktion. Die Optimierung des Aktivitätsparameters \(\vec{\theta}\) erfolgt über die Maximierung der konzentrierten Log-Likelihood-Funktion, wobei intern die negative Log-Likelihood minimiert wird. Der Glattheitsparameter \(\vec{p}\) wird implizit auf \(p_j=2\) gesetzt, da die quadrierte euklidische Distanz verwendet wird. Es wird ein kleiner Nugget-Term (eps
) zur Diagonalen der Korrelationsmatrix hinzugefügt, um die numerische Stabilität zu gewährleisten.
2. Die Black-Box-Funktion f(x) = x² + 1
Diese Funktion simuliert ein teures oder undurchsichtiges System, dessen interne Funktionsweise dem Optimierungsalgorithmus nicht bekannt ist.
3. Initialer Stichprobenplan X
Für einen “optimalen” Versuchsplan wird Latin Hypercube Sampling (LHS) verwendet, da dies eine raumfüllende Eigenschaft aufweist. Dies stellt sicher, dass der Eingaberaum effizient erkundet wird. Die Eingabedaten werden intern auf den Bereich $$ skaliert, um die Konsistenz der \(\theta\)-Werte über verschiedene Probleme hinweg zu gewährleisten.
4. Sequenzieller Optimierungsablauf Der Prozess beginnt mit einem initialen Stichprobenplan. In jeder Iteration wird das Kriging-Modell mit den verfügbaren Daten gefittet. Anschließend wird eine Optimierung auf dem Surrogatmodell (dem gefitteten Kriging-Modell) durchgeführt, um den nächsten vielversprechenden Punkt im Designraum zu finden. Dieser Punkt wird der Black-Box-Funktion übergeben, und die erhaltene Beobachtung wird zum Trainingsdatensatz hinzugefügt. Dieser iterative Prozess wird bis zu einer maximalen Anzahl von Funktionsauswertungen fortgesetzt.
Hier ist der entsprechende Python-Code:
64 2. Die Black-Box-Funktion
def f_black_box(x):
"""Analytische Black-Box-Funktion: f(x)"""
return -x**2*np.cos(x) + 1 + x**2*np.sin(x)
# 3. Funktion zur Erstellung des initialen Stichprobenplans (Latin Hypercube Sampling)
def create_initial_design(n_points, dimensionality, x_range):
"""Erstellt einen raumfüllenden initialen Versuchsplan mittels Latin Hypercube Sampling.
Args:
n_points (int): Anzahl der Designpunkte
dimensionality (int): Dimension des Eingaberaums
x_range (tuple): Tuple mit (unterer_grenze, oberer_grenze) für den Wertebereich
Returns:
np.ndarray: Matrix der Designpunkte der Form (n_points, dimensionality)
"""
# Verwende einen Integer als Seed statt SeedSequence
= qmc.LatinHypercube(d=dimensionality, seed=1234)
sampler
# Generiere Samples im Einheitswürfel [0,1]^d
= sampler.random(n=n_points)
lhs_samples_unit_cube
# Extrahiere die Grenzen
= x_range
x_lower, x_upper
# Skaliere die Samples vom Einheitswürfel auf den gewünschten Bereich
= x_lower + (x_upper - x_lower) * lhs_samples_unit_cube
X_initial
# Stelle sicher, dass das Output ein 2D Array ist
return X_initial.reshape(-1, dimensionality)
65 Hauptskript für die sequentielle Optimierung
42) # Für Reproduzierbarkeit der Zufallszahlen
np.random.seed(
# Definition des Suchraums für die Black-Box-Funktion
= -5.0
x_lower_bound = 5.0
x_upper_bound = (x_lower_bound, x_upper_bound)
search_range = 1 # f(x) = x^2 + 1 ist 1-dimensional
dimensionality
# Parameter für den initialen Stichprobenplan
= 7
n_initial_points
# Maximale Gesamtzahl an Funktionsauswertungen
= 20
N_max_evaluations
# --- Schritt 1: Initialen Stichprobenplan erstellen und Black-Box-Funktion auswerten ---
= create_initial_design(n_initial_points, dimensionality, search_range)
X_train_current = f_black_box(X_train_current)
y_train_current
print(f"Initialer X_train (n={n_initial_points}):\n{np.round(X_train_current.flatten(), 3)}")
print(f"Zugehöriger y_train:\n{np.round(y_train_current.flatten(), 3)}")
print("-" * 70)
Initialer X_train (n=7):
[-3.538 -4.115 -2.033 4.626 3.116 1.974 0.369]
Zugehöriger y_train:
[ 17.382 24.522 -0.856 -18.479 10.956 6.114 0.922]
----------------------------------------------------------------------
# --- Schritt 2: KrigingRegressor-Modell initialisieren ---
# Bounds für log10(theta), z.B. 10^-3 bis 10^2
= [(-3.0, 2.0)]
theta_bounds_log10 = KrigingRegressor(initial_theta=1.0, bounds=theta_bounds_log10)
M_1
# --- Schritt 3: Sequenzieller Optimierungs-Loop ---
# Anzahl der Punkte, die nach dem initialen Plan hinzugefügt werden
= N_max_evaluations - n_initial_points
num_infill_steps
for i in range(num_infill_steps + 1): # +1, um auch nach dem letzten Infill-Punkt zu fitten
= len(X_train_current)
current_evals print(f"\n--- Iteration {i+1} (Gesamtauswertungen: {current_evals}) ---")
# Modell mit den aktuellen Daten fitten
print("Kriging-Modell wird gefittet...")
M_1.fit(X_train_current, y_train_current)print(f"Optimiertes Theta: {np.round(M_1.opt_theta_.item(), 4)}")
print(f"Geschätzter globaler Mittelwert (mu_hat): {np.round(M_1.mu_hat_, 4)}")
# Überprüfen, ob die maximale Anzahl an Auswertungen erreicht wurde
if current_evals >= N_max_evaluations:
print("Maximale Anzahl an Auswertungen erreicht. Optimierung wird beendet.")
break
--- Iteration 1 (Gesamtauswertungen: 7) ---
Kriging-Modell wird gefittet...
Optimiertes Theta: -0.1013
Geschätzter globaler Mittelwert (mu_hat): 3.5178
--- Iteration 2 (Gesamtauswertungen: 7) ---
Kriging-Modell wird gefittet...
Optimiertes Theta: -0.1013
Geschätzter globaler Mittelwert (mu_hat): 3.5178
--- Iteration 3 (Gesamtauswertungen: 7) ---
Kriging-Modell wird gefittet...
Optimiertes Theta: -0.1013
Geschätzter globaler Mittelwert (mu_hat): 3.5178
--- Iteration 4 (Gesamtauswertungen: 7) ---
Kriging-Modell wird gefittet...
Optimiertes Theta: -0.1013
Geschätzter globaler Mittelwert (mu_hat): 3.5178
--- Iteration 5 (Gesamtauswertungen: 7) ---
Kriging-Modell wird gefittet...
Optimiertes Theta: -0.1013
Geschätzter globaler Mittelwert (mu_hat): 3.5178
--- Iteration 6 (Gesamtauswertungen: 7) ---
Kriging-Modell wird gefittet...
Optimiertes Theta: -0.1013
Geschätzter globaler Mittelwert (mu_hat): 3.5178
--- Iteration 7 (Gesamtauswertungen: 7) ---
Kriging-Modell wird gefittet...
Optimiertes Theta: -0.1013
Geschätzter globaler Mittelwert (mu_hat): 3.5178
--- Iteration 8 (Gesamtauswertungen: 7) ---
Kriging-Modell wird gefittet...
Optimiertes Theta: -0.1013
Geschätzter globaler Mittelwert (mu_hat): 3.5178
--- Iteration 9 (Gesamtauswertungen: 7) ---
Kriging-Modell wird gefittet...
Optimiertes Theta: -0.1013
Geschätzter globaler Mittelwert (mu_hat): 3.5178
--- Iteration 10 (Gesamtauswertungen: 7) ---
Kriging-Modell wird gefittet...
Optimiertes Theta: -0.1013
Geschätzter globaler Mittelwert (mu_hat): 3.5178
--- Iteration 11 (Gesamtauswertungen: 7) ---
Kriging-Modell wird gefittet...
Optimiertes Theta: -0.1013
Geschätzter globaler Mittelwert (mu_hat): 3.5178
--- Iteration 12 (Gesamtauswertungen: 7) ---
Kriging-Modell wird gefittet...
Optimiertes Theta: -0.1013
Geschätzter globaler Mittelwert (mu_hat): 3.5178
--- Iteration 13 (Gesamtauswertungen: 7) ---
Kriging-Modell wird gefittet...
Optimiertes Theta: -0.1013
Geschätzter globaler Mittelwert (mu_hat): 3.5178
--- Iteration 14 (Gesamtauswertungen: 7) ---
Kriging-Modell wird gefittet...
Optimiertes Theta: -0.1013
Geschätzter globaler Mittelwert (mu_hat): 3.5178
66 — Schritt 4: Surrogat-basierte Optimierung zur Suche des nächsten Infill-Punktes —
Definiere die Zielfunktion für die innere Optimierung (Minimierung des Surrogats)
def surrogate_objective(x_val):
# Stelle sicher, dass x_val im richtigen Format (2D-Array) für predict ist
= np.atleast_2d(x_val)
x_val_reshaped return M_1.predict(x_val_reshaped).item() # Rückgabe als Skalar
67 Suchgrenzen für die Optimierung auf dem Surrogatmodell
= [(x_lower_bound, x_upper_bound)] * dimensionality
surrogate_search_bounds
print("Optimierung auf dem Surrogatmodell, um den nächsten Infill-Punkt zu finden...")
# Anfangsschätzung für die Surrogat-Optimierung (zufälliger Punkt im Suchraum)
= np.random.uniform(low=x_lower_bound, high=x_upper_bound, size=dimensionality)
x0_surrogate_opt
# Verwende L-BFGS-B für die Surrogat-Optimierung
= minimize(surrogate_objective,
surrogate_opt_result =x0_surrogate_opt,
x0='L-BFGS-B',
method=surrogate_search_bounds)
bounds
= np.atleast_2d(surrogate_opt_result.x) # Sicherstellen, dass 2D-Array
x_new_infill = f_black_box(x_new_infill) # Auswertung des neuen Punktes durch Black-Box
y_new_infill
print(f"Neuer Infill-Punkt x_1: {np.round(x_new_infill.flatten(), 3)}, "
f"Zugehöriger y_1 (Black-Box): {np.round(y_new_infill.item(), 3)}")
# Neuen Punkt zum Trainingsdatensatz hinzufügen
= np.vstack((X_train_current, x_new_infill))
X_train_current = np.vstack((y_train_current, y_new_infill))
y_train_current
print(f"Aktueller X_train (total {len(X_train_current)} Punkte):\n{np.round(X_train_current.flatten(), 3)}")
print(f"Aktueller y_train:\n{np.round(y_train_current.flatten(), 3)}")
print("-" * 70)
print("\n--- Optimierungsprozess abgeschlossen ---")
Optimierung auf dem Surrogatmodell, um den nächsten Infill-Punkt zu finden...
Neuer Infill-Punkt x_1: [-1.93], Zugehöriger y_1 (Black-Box): -1.178
Aktueller X_train (total 8 Punkte):
[-3.538 -4.115 -2.033 4.626 3.116 1.974 0.369 -1.93 ]
Aktueller y_train:
[ 17.382 24.522 -0.856 -18.479 10.956 6.114 0.922 -1.178]
----------------------------------------------------------------------
--- Optimierungsprozess abgeschlossen ---
68 Fitte das finale Modell mit allen verfügbaren N_max_evaluations Punkten
M_1.fit(X_train_current, y_train_current)
# Visualisierung der Ergebnisse
= np.linspace(x_lower_bound, x_upper_bound, 200).reshape(-1, 1)
x_plot = M_1.predict(x_plot)
y_predict_final = f_black_box(x_plot)
y_true
=(10, 6))
plt.figure(figsize'k--', linewidth=2, label='Wahre Funktion: $f(x)$')
plt.plot(x_plot, y_true, 'bo', markersize=8, label=f'Ausgewertete Punkte (N={len(X_train_current)})')
plt.plot(X_train_current, y_train_current, 'r-', linewidth=2, label='Kriging-Vorhersage')
plt.plot(x_plot, y_predict_final, 'Kriging-Modell mit sequenzieller Optimierung der Hyperparameter')
plt.title('x')
plt.xlabel('y')
plt.ylabel(
plt.legend()True)
plt.grid(
plt.show()
print(f"\nFinal optimiertes Theta: {np.round(M_1.opt_theta_.item(), 4)}")
print(f"Final geschätzter globaler Mittelwert: {np.round(M_1.mu_hat_, 4)}")
# Der letzte optimierte Punkt auf dem Surrogat (x_new_infill) ist eine gute Schätzung des Optimums der Funktion.
print(f"Bester gefundener x-Wert (aus Surrogat-Optimierung): {np.round(surrogate_opt_result.x.item(), 3)}")
print(f"Zugehöriger y-Wert der Black-Box an diesem x: {np.round(f_black_box(surrogate_opt_result.x).item(), 3)}")
Final optimiertes Theta: -0.577
Final geschätzter globaler Mittelwert: 2.8055
Bester gefundener x-Wert (aus Surrogat-Optimierung): -1.93
Zugehöriger y-Wert der Black-Box an diesem x: -1.178