Skip to content
Permalink
95cf9f74a9
Switch branches/tags

Name already in use

A tag already exists with the provided branch name. Many Git commands accept both tag and branch names, so creating this branch may cause unexpected behavior. Are you sure you want to create this branch?
Go to file
 
 
Cannot retrieve contributors at this time
1079 lines (895 sloc) 52.9 KB
#import sys
from symbol import parameters
import time
import numpy as np
import json
import pickle
import matplotlib.pyplot as plt
from matplotlib.colors import LogNorm
from skopt import Optimizer
from skopt.learning.gaussian_process.kernels import ConstantKernel, Matern, Sum, RBF, WhiteKernel
from skopt.learning import GaussianProcessRegressor
from skopt.plots import plot_evaluations, plot_convergence, plot_objective, plot_gaussian_process, plot_histogram, plot_objective_2D
from skopt import expected_minimum
from scipy.stats import multivariate_normal
import time
#from DE_Optimizer import DifferentialEvolution
#import sys
#import os
#from skopt.benchmarks import branin, hart6
#from skopt.space.space import Real, Integer
#from functools import partial
#from skopt import gp_minimize
#from skopt import callbacks
#from skopt import gp_minimize, forest_minimize, dummy_minimize
#from skopt.callbacks import CheckpointSaver
#from skopt.learning import ExtraTreesRegressor
#from joblib import Parallel, delayed
#import time
''' Future plans
Domain reduction
Include a scale factor that only searchs over a smaller region around the best point.
Some gpr libraries slowly decrease the domain size to help with convergence.
Need to make a new gpr class for this. Can't modify previuos one.
Differential evolution switch
Differential evolution switch that turns on DE. The DE alg will take the best
points to create the first population, then start the algoirthm for a fixed number
of points. This could be useful to run after random points to get better points
and help make a better guess of the hyperparameters. Or if GPR is failing, then
we can turn it temporarily since DE is very robust, but slow.
'''
class GPR_Optimizer(Optimizer):
"""GPR_Optimizer is based on Optimizer class of skopt library.
scikit-optimize.github.io/stable
This class allows you to insert the noise for each data point.
1D Example:
opt = GPR_Optimizer(dimensions=[(-2.,2.)], # parameter space
length_scale=[1], length_scale_bounds=[(.01,10.)],
noise_level=0.1, noise_level_bounds=(0.01,1),
n_initial_points=10,
kappa=10)
next_x = opt.ask() # The first n_initial_points will be random, then the GPR optimizer starts.
f_val = objective(next_x) #objective is the cost function
res = opt.tell(next_x, f_val, error, fit=True) # Report the value.
"""
def __init__(self, dimensions, length_scale, length_scale_bounds, noise_level=0.1, noise_level_bounds=[1e-3,2e-1], n_initial_points=10, x_initial=[],
acq_func="LCB", kappa=4*1.96, xi=4*0.01, alpha=[], normalize_y=True, sigma_value_bounds=(1e-3,1.0), sigma_value=0.5,
n_restarts_optimizer=0, acq_optimizer='sampling', verbose=True, config_path='optimzer_config.json', domain_reduction=1, best_point=[], hp_fit_n_points=10000, switch=True):
'''
Initialize the GaussianProcessRegressor.
I use RBF + WhiteKernel. The default kernel is "1.0 * RBF(1.0)".
Kernel's hyperparameters are optimized during fitting at tell. Another common
option is kernel = 1.0 * Matern(length_scale=1.0, length_scale_bounds=(1e-1, 1.0), nu=2.5)
which is a generalization of RBF, and equiv when nu = inf, but takes longer to compute.
alpha (default 1e-10) is variance = noise**2 added to diagonal kernel durin fitting.
alpha can be a constant, where then it is the same for all points, or an array
the same length as data. Some sources say it is equiv to white noise added
by a WhiteKernel. But WhiteKernel has a hyperparameter for the noise which
is tuned every time opt.tell(...) is called.
If you set noise = "gaussian", then it will add a WhiteKernel. I below
add a WhiteKernel manually in order to set the noise_level_bounds manually.
Note also that ConstantKernel(constant_value=1**2, constant_value_bounds=constant_value_bounds)*RBF() is equivalent to 1 * RBF(),
expect you can explicitely set the starting value and bounds, which by default are 1.0 and (1e-5, 1e5)
Parameters
----------
dimensions : list of bounds for each parameters
Example: [(-2.,2.), ..., ndims]
Dimensions can be list of (0., 1.) for real, and (0,1) for integers. Other options available like categories.
length_scale : initial length_scale.
Can be float or list of length ndims, In normalized units from 0 to 1.
length_scale_bounds: bounds for each scales
Can be [1e-3,1] or list of bounds for each parameter. In normalized units from 0 to 1.
noise_level : float, initial value of noise hyperparameter for WhiteKernel, which is Gaussian noise added to the RBF kernel.
Example: 1e-3 or None
If noise_level = 0 or None, then WhiteKernel is not included, and the noise only comes from alpha.
NEW: When normalize_y = True, then this is actually noise/std(y). So it is fractional noise over hte std of your data.
noise_level_bounds : bounds
Example: [1e-3, 1]
n_initial_points: number of initial random points.
Sampling method determined by 'initial_point_generator' in opimtizer.py
acq_func : Acquisition function that determines the next best point.
# LCB = lower confidence bound. Uses kappa, not xi.
# PI= probabilit of improvement. PI doesn't care how much better predictions are than previous values, since it gives equal reward. Uses kappa, not xi.
# EI = expective improvement. Uses xi, which is the desired improvement. Does not use kappa.
# "gp_hedge" uses all three, then favors the ones that improve.
# Advice: if exploration is priority, use LCB. If exploitation, then use PI. For balanced, use EI.
kappa : Controls exploration for local optimization. kappa high (even inf) favors exploration
1.96 default. kappa contros how much of the variance in the predicted value should be taken into account.
kappa is used by LCB and . xi is used for EI and is goal of improvement to shoot for over the previous best value.
Code currently uses this kappa to set both. Maybe in future make xi its own variable.
xi : float 0 to 1, parameter for EI acquisition function
Determines the desired improvement, for x=0.01 is 1% change.
alpha : diagonal of the covariance matrix, which is the variance = err^2
float or array-like, optional (default: 1e-10)
The tell function can update a list of alphas that give the variance of eery point.
If is too small then will have trouble with optimizer model fits.
ONLY USE alpha BY ENTERING IT WITH tell() OR NORMALIZE_Y WILL MESS IT UP.
If using normalize_y = True, then it this is fractional variance over the std of your data.
x_initial : [] or [ (0,0) ] for 2D. list of initial points to be taken first.
constant_value : inintial constant for RBF kernel.
The kernel is sigma^2 exp(- |x - x'|^2/2l^2 ). The constant is sigma^2. It allows larger outputs, increaseing the flexibiliyt of the GPR model. Important to set similar to expected scale.
constant_value_bounds : bounds for constant_value
acq_optimizer : 'lbfgs' or 'sampling'. For finding hyperparameters.
Takes 10,000 points of hyperparameters, including current setting. 'sampling' uses best.
'lbfgs' takes best 'n_restart_optimizer' points and performs gradient based lbfgs optimization,
then picks best. 'lgbfgs' is slow for high dimensions.
n_restarts_optimizer : 4, see acq-optimizer. If 'lbfgs' selected, number of points to perform lbfgs on.
verbose : Default True, whether to print while running
normalize_y : 'True', when normalize_y true, then the y data has mean subtracted and divided by std at every call of tell() right before it fits new
hp's. # That normalization is not used to change noise level. So noise level above should be interpreted at noise_level/std(y). That would be
the same for alpha,
# expect I now always enter alpha=error^2 with tell, and I have changed tell() to normalize alpha by std as well. So just enter the alpha by using
# the tell(x,y,y_err) , where y_err is in same units as y. If no noise, still put small value of y_err. Cannot use y_err =None or 0!!!!!!
# if you really have no noise, still use small value of y_err, which is necessary for the fitting to converge easily.
'''
# Check types
dimensions = [tuple(float(num) for num in tpl) for tpl in dimensions] # make sure dimensions is a list of tuples of
# Save to class
self.dimensions = dimensions
self.length_scale = length_scale
self.length_scale_bounds = length_scale_bounds
self.noise_level = noise_level
self.noise_level_bounds = noise_level_bounds
self.n_initial_points = n_initial_points
self.x_initial = x_initial
self.acq_func = acq_func
self.kappa = kappa
self.xi = xi
self.alpha = alpha
self.sigma_value_bounds = sigma_value_bounds
self.sigma_value = sigma_value
self.n_restarts_optimizer = n_restarts_optimizer
self.acq_optimizer = acq_optimizer
self.verbose = verbose
self.file_path = file_path
self.change_flag=change_flag
self.switch=switch
self.hp_fit_n_points = hp_fit_n_points
self.domain_reduction = domain_reduction
self.best_point = best_point
self.config_path = config_path
# Define the constant value bounds for the amplitude parameter
#constant_value_bounds = (1e-3, 1e3) # Square since the kernel uses the square of the value, default = 1e-5 to 1e5
# constant value is variance, so square sigmas
constant_value = sigma_value**2
constant_value_bounds = ( sigma_value_bounds[0]**2, sigma_value_bounds[1]**2)
if noise_level == 0 or noise_level is None:
# If no noise, don't add WhiteKernel
kernel = ConstantKernel(constant_value=constant_value, constant_value_bounds=constant_value_bounds) * RBF(length_scale=length_scale, length_scale_bounds=length_scale_bounds)
print("Using no noise.")
else:
# If noise is present, add WhiteKernel
kernel = ConstantKernel(constant_value=constant_value, constant_value_bounds=constant_value_bounds) * RBF(length_scale=length_scale, length_scale_bounds=length_scale_bounds) + WhiteKernel(noise_level=noise_level, noise_level_bounds=noise_level_bounds) #starting hp. fitting at each run.
gpr = GaussianProcessRegressor(kernel=kernel,
alpha=alpha,
normalize_y=normalize_y, #XXXXXXXXXXXXXXXXXXXXXXXX
# Subtracts mean and normalize std to 1. It is done at beginning of fit, before alpha.
# in _gpr.py of sklearn, y = (y - self._y_train_mean) / self._y_train_std
# Can access them at opt.models[-1].y_train_std_
# Also weird because adjusts y data, but doesn't change theta or alpha.
# So if normalization changes a lot, then the hyperparameer and alpha will have to change.
# Whether the target values y are normalized, i.e., the mean of the
# observed target values become zero. This parameter should be set to
# True if the target values' mean is expected to differ considerable from
# zero. When enabled, the normalization effectively modifies the GP's
# prior based on the data, which contradicts the likelihood principle;
# normalization is thus disabled per default.
#noise="gaussian", # noise="gaussian" adds WhiteKernel, but I added manually. "off" doesn't work, so need to not include.
n_restarts_optimizer=n_restarts_optimizer
# how many The number of restarts of the optimizer for finding the kernel's hyperparameters. The first run
# of the optimizer is performed from the kernel's initial parameters,
# the remaining ones (if any) from thetas sampled log-uniform randomly
# from the space of allowed theta-values. If greater than 0, all bounds
# must be finite. Note that n_restarts_optimizer == 0 implies that one
# run is performed.
)
# For domain reduction is not 1 and best_point not empty, set dimensions to reduced dimensions right here!
super().__init__(
dimensions, # dimensions,
base_estimator= gpr, #"GP", #gpr, # default "GP" # base estimator, also GP, RF, ET, GBRT
n_initial_points=n_initial_points, #default 10
acq_optimizer=acq_optimizer, #'sampling', 'lbfgs', or auto (which chooses between two). 'lbfgs' is slow for high dim and many points
initial_point_generator="lhs", # random (default), sobol, halton, hammersly, lhs (latin hypercube sampling), grid. See optimizer.py for more details.
acq_func=acq_func, #(randomly choose LCB, EI, or PI) #"gp_hedge", "LCB", "EI", "PI".
acq_func_kwargs = {"xi":xi, "kappa":kappa}, # kappa for LCB, xi for EI.
acq_optimizer_kwargs = {"n_points":hp_fit_n_points, "n_jobs":2} # settings to speed up tell(). n_jobs sets number of jobs in Parallel. sampling of hp space is always done with 'n_points' points. If lbfgs, then the 'n_restarts_optimizer' best points of those are used as start for 20 iterations of lbfgs optimizaation. #"n_restarts_optimizer":1,
)
self.error = [] # List of errors of measured points. Must float or be an array of length of number data points.
self.x_initial = x_initial # initial points to run
self.verbose = verbose # Whether to print statements
self.mean_for_y_norm = 0 # NOT USING. mean and std for normalizing all the y data. This is so that you don't have to change ConstantValue.
self.std_for_y_norm = 1 # NOT USING
def ask(self, *args):
''' Same as optimizer class.
# Obtains next point. If < n_initial_point, then from initial_point_generator.
Otherwise it uses the acq_func to find the next point uses either LCB, EI, or PI.
Default is "gp_hedge" method which uses all three methods with gains based on which
improves.
If called twice in a row,then gives same value until tell() is called.
'''
# Run poitns from x_intiial until list is empty. Removes the element of x_initial.
if self.x_initial: # Check if the list is not empty
next_x = self.x_initial.pop(0) # Remove and get the first element
# first_element is now the removed element
# x_initial is now the list without its first element
print("Removed element:", next_x)
else:
next_x = super().ask(*args) # very fast
#print("next_x: ", next_x)
return next_x
def tell(self, next_x, f_val, error=None, fit=True):
''' Reports a measured value. Inherits Optimizer class, but also allows you to put in point dependence noise, if eg the noise is different
between points or you know the noise from repeated measurements.
next_x : list, parameters for measured point
f_val : measured value
error : If error included, then uses it to define point specific alpha. If error=0 and then all noise is in initial alpha parameter and WhiteKernel.
fit : Whether to refit hyperparameters at the end
Note: tell() is most time consuming. Can speed up by reducing 'n_restarts_optimizer' and maybe sampling method.
If normalize_y = True, then gpr.py will normalize y data by subtracting mean and dividing by std every time tell() is called.
You can access the std and mean from the last iteratio with self.models[-1].y_train_std_
It does not normalize noise or alpha. I have setup alpha such that alpha = error^2/ std^2 so we can keep error
in the units of the cost that we enter.
'''
# normalize
f_val = self.normalize_y(f_val)
if error is not None:
error = error/self.std_for_y_norm # normalize
# update alpha
if error is not None:
self.error = np.append(self.error, error )
# if self.models: # check if not empty
# std = self.models[-1].y_train_std_ # the last used standard deviation of y data
# else:
if len(self.yi)>3:
std = np.std(self.yi) # standard dev of the y data. Use in tell() in gpr.py to normalize data
self.base_estimator_.alpha= self.error**2 / std**2 #use this to update alpha during scan. the fit always clones the base estimtator.
#self.update_next() # use this if x different from latest ask value.
start_time = time.time()
res = super().tell(next_x, f_val, fit=fit)
end_time = time.time()
if np.abs(end_time - start_time) > 1:
print("elapsed time for tell: ", end_time - start_time)
if len(self.models) != 0:
hp = np.exp(self.models[-1].kernel_.theta) # get latest hyperparameters, [] list. Note that y-data is normalized, so alpha and noisevalue are normalized by std.
likelihood = self.models[-1].log_marginal_likelihood_value_ # float, log-marginal likelihood of hyperparameters
if self.verbose:
formatted_hp = [f"{x:.2e}" for x in hp]
formatted_likelihood = f"{likelihood:.1f}"
print("[hp]: ", formatted_hp, ' LML: ', formatted_likelihood)
return res
def normalize_y(self, y, inverse=False):
''' Subtract mean and divide by std in order to make y data have std = 1, and mean = 0.
# This is so ConstantValue can always be close to 1. '''
mean = self.mean_for_y_norm
std = self.std_for_y_norm
if inverse is False:
return (y - mean)/std
if inverse is True:
return y*std + mean
############# PLOTTING ###############################
def expected_minimum(self, **kwargs):
'''Options: n_random_starts : int, default=20
The number of random starts for the minimization of the surrogate
model.'''
# Calculate the expected minimium of the model
x_exp, best_fun = expected_minimum(self.get_result(), **kwargs)
return x_exp, best_fun
def get_hyperparameters(self):
''' Get list of all hyperparameters and likelihoods for all models.
theta is np.log(hyperparameter), so I take exp^(theta).
The hyperparameters are (annoyingly) in opt.models[i].kernel_.theta
To see the latest kernel with parameters: opt.models[-1].kernel_
This shows you what each hyperparameter is.
To see just the np.log() of value, use np.exp(opt.models[-1].kernel_.theta)
I think these parameters are arrange left to right from print(opt.models[-1].kernel_)
The magnitude of the LML isn't necessarily meaninfgul. Comparison '
LML values are typically negative. The probability is between 0 and 1, and taking log gives negative number.
'''
opt = self
hp = [np.exp(opt.models[i].kernel_.theta) for i in range(len(opt.models))] # has n_iter - n_initial_ponit + 1 models.
likelihood = [opt.models[i].log_marginal_likelihood_value_ for i in range(len(opt.models))]
print(opt.models[-1].kernel_) # # Print latest kernel
return hp, likelihood
def plot_hyperparameters(self):
''' plot the hyperparameters vs iteration'''
hp, likelihood = self.get_hyperparameters()
# create an array of indices from 0 to the length of the list
lst = hp
x = np.arange(len(lst))
# loop over the number of elements in each array (assuming they are all the same length)
for i in range(len(lst[0])):
# create an empty array to store the ith element of each array
y = np.empty(len(lst))
# loop over the list of arrays and fill the y array with the ith element
for j in range(len(lst)):
y[j] = lst[j][i]
# plot x vs y with a label
if i==0:
# Constant value hyperparameter
label='sigma' # this is sqrt of ConstantValue
y = np.sqrt(np.abs(y)) #
elif i==len(lst[0])-1 and self.noise_level is not None:
label='noise'
#y = y*self.std_for_y_norm
else:
label = f'length scale {i+1-1}'
plt.plot(x, y, label=label)
plt.yscale('log')
plt.xlabel('run (after initial)')
plt.ylabel('hyperparameter')
plt.title(str(self.models[-1].kernel_))
plt.grid()
# add a legend and show the plot
plt.legend()
plt.show()
def predict():
''' add later '''
# Predict output for X.
# In addition to the mean of the predictive distribution, also its
# standard deviation (return_std=True) or covariance (return_cov=True),
# the gradient of the mean and the standard-deviation with respect to X
# can be optionally provided.
return 0
def plot_LML(self):
''' Plots the log marginal likelihood noise level and length scale for 1D scan.
Only works for 1D.
This was taken from a scikit learn example.'''
opt = self
hp_final = np.exp(opt.models[-1].kernel_.theta)
print("final noise parameters:", hp_final )
gpr = opt.models[-1]
print(gpr.kernel_)
length_scale = np.logspace(-2, 2, num=50)
noise_level = np.logspace(-2, 1, num=50)
length_scale_grid, noise_level_grid = np.meshgrid(length_scale, noise_level)
log_marginal_likelihood = [
gpr.log_marginal_likelihood(theta=np.log([hp_final[0], scale, noise])) # insert first hp here (amp of RBF)
for scale, noise in zip(length_scale_grid.ravel(), noise_level_grid.ravel())
]
log_marginal_likelihood = np.reshape(
log_marginal_likelihood, newshape=noise_level_grid.shape
)
vmin, vmax = (-log_marginal_likelihood).min(), 4*50 # set max height here.
level = np.around(np.logspace(np.log10(vmin), np.log10(vmax), num=50), decimals=1)
plt.contour(
length_scale_grid,
noise_level_grid,
-log_marginal_likelihood,
levels=level,
norm=LogNorm(vmin=vmin, vmax=vmax),
)
plt.colorbar()
plt.xscale("log")
plt.yscale("log")
plt.xlabel("Length-scale")
plt.ylabel("Noise-level")
plt.title("Log-marginal-likelihood")
plt.show()
def plot_convergence(self, **kwargs):
# from skopt.plots
# if normalize_y = True, this is still in unnormalized units
res = self.get_result()
plot_convergence(res, **kwargs) # plot at every iteration the best point. I want to plot each point.
def plot_objective(self, **kwargs):
'''
See https://scikit-optimize.github.io/stable/auto_examples/plots/partial-dependence-plot.html#sphx-glr-auto-examples-plots-partial-dependence-plot-py
partial dependence plot. Partial dependence plots average over the other dimensions, rather than showing cross-sections. See below.This is not the same thing as plotting the cross-sections.
It shows how the objective function changes with changes in the values of one or two hyperparameters, while marginalizing over the values of all other hyperparameters. This is useful to understand the relationship between the hyperparameters and the objective function.
This function takse a long time for > 3 dimensions. See options below to speed up.
Calculating partial dependence is expensive.Objective Function Minimum: The lowest value in the partial dependence plot represents the minimum value of the objective function with respect to the hyperparameter(s) being plotted, while averaging out the effects of the other hyperparameters. This value indicates the most optimal (lowest) average outcome for the given range(s) of the hyperparameter(s) in question.
Best Average Performance: In hyperparameter optimization, this minimum value suggests the best average performance that can be expected from the model when the hyperparameter(s) is/are set within the specific range(s) shown in the plot. This is particularly useful when you want to understand how sensitive the model's performance is to changes in one or two specific hyperparameters.
Not Necessarily the Global Optimum: It's important to note that this value does not necessarily represent the global optimum of the entire hyperparameter space. Partial dependence plots marginalize over other dimensions, meaning they average out the effects of other hyperparameters. Therefore, the lowest value in these plots is more about understanding the average behavior of the model in relation to specific hyperparameters rather than pinpointing the exact best hyperparameter combination.
def plot_objective(result, levels=10, n_points=40, n_samples=250, size=2,
zscale='linear', dimensions=None, sample_source='random',
minimum='result', n_minimum_search=None, plot_dims=None,
show_points=True, cmap='viridis_r'):
Plot a 2-d matrix with so-called Partial Dependence plots
of the objective function. This shows the influence of each
search-space dimension on the objective function.
This uses the last fitted model for estimating the objective function.
The diagonal shows the effect of a single dimension on the
objective function, while the plots below the diagonal show
the effect on the objective function when varying two dimensions.
The Partial Dependence is calculated by averaging the objective value
for a number of random samples in the search-space,
while keeping one or two dimensions fixed at regular intervals. This
averages out the effect of varying the other dimensions and shows
the influence of one or two dimensions on the objective function.
Also shown are small black dots for the points that were sampled
during optimization.
A red star indicates per default the best observed minimum, but
this can be changed by changing argument ´minimum´.
.. note::
The Partial Dependence plot is only an estimation of the surrogate
model which in turn is only an estimation of the true objective
function that has been optimized. This means the plots show
an "estimate of an estimate" and may therefore be quite imprecise,
especially if few samples have been collected during the
optimization
(e.g. less than 100-200 samples), and in regions of the search-space
that have been sparsely sampled (e.g. regions away from the optimum).
This means that the plots may change each time you run the
optimization and they should not be considered completely reliable.
These compromises are necessary because we cannot evaluate the
expensive objective function in order to plot it, so we have to use
the cheaper surrogate model to plot its contour. And in order to
show search-spaces with 3 dimensions or more in a 2-dimensional
plot,
we further need to map those dimensions to only 2-dimensions using
the Partial Dependence, which also causes distortions in the plots.
Parameters
----------
result : `OptimizeResult`
The optimization results from calling e.g. `gp_minimize()`.
levels : int, default=10
Number of levels to draw on the contour plot, passed directly
to `plt.contourf()`.
n_points : int, default=40
Number of points at which to evaluate the partial dependence
along each dimension.
n_samples : int, default=250
Number of samples to use for averaging the model function
at each of the `n_points` when `sample_method` is set to 'random'.
size : float, default=2
Height (in inches) of each facet.
zscale : str, default='linear'
Scale to use for the z axis of the contour plots. Either 'linear'
or 'log'.
dimensions : list of str, default=None
Labels of the dimension
variables. `None` defaults to `space.dimensions[i].name`, or
if also `None` to `['X_0', 'X_1', ..]`.
plot_dims : list of str and int, default=None
List of dimension names or dimension indices from the
search-space dimensions to be included in the plot.
If `None` then use all dimensions except constant ones
from the search-space.
sample_source : str or list of floats, default='random'
Defines to samples generation to use for averaging the model function
at each of the `n_points`.
A partial dependence plot is only generated, when `sample_source`
is set to 'random' and `n_samples` is sufficient.
`sample_source` can also be a list of
floats, which is then used for averaging.
Valid strings:
- 'random' - `n_samples` random samples will used
- 'result' - Use only the best observed parameters
- 'expected_minimum' - Parameters that gives the best
minimum Calculated using scipy's minimize method.
This method currently does not work with categorical values.
- 'expected_minimum_random' - Parameters that gives the
best minimum when using naive random sampling.
Works with categorical values.
minimum : str or list of floats, default = 'result'
Defines the values for the red points in the plots.
Valid strings:
- 'result' - Use best observed parameters
- 'expected_minimum' - Parameters that gives the best
minimum Calculated using scipy's minimize method.
This method currently does not work with categorical values.
- 'expected_minimum_random' - Parameters that gives the
best minimum when using naive random sampling.
Works with categorical values
n_minimum_search : int, default = None
Determines how many points should be evaluated
to find the minimum when using 'expected_minimum' or
'expected_minimum_random'. Parameter is used when
`sample_source` and/or `minimum` is set to
'expected_minimum' or 'expected_minimum_random'.
show_points: bool, default = True
Choose whether to show evaluated points in the
contour plots.
cmap: str or Colormap, default = 'viridis_r'
Color map for contour plots. Passed directly to
`plt.contourf()`
Returns
-------
ax : `Matplotlib.Axes`
A 2-d matrix of Axes-objects with the sub-plots.
'''
# from skopt.plots
# if normalize_y = True, this is still in unnormalized units
res = self.get_result()
plot_objective(res, **kwargs) # plot at every iteration the best point. I want to plot each point.
def plot_histogram(self, **kwargs):
# from skopt.plots
# if normalize_y = True, this is still in unnormalized units
res = self.get_result()
plot_histogram(res, **kwargs) # plot at every iteration the best point. I want to plot each point.
def plot_gaussian_process(self, **kwargs):
# from skopt.plots
res = self.get_result()
#plot_gaussian_process(res) # plot at every iteration the best point. I want to plot each point.
plot_gaussian_process(res, #objective=objective,
#noise_level=0.001,
show_next_point=False,
show_acq_func=True, **kwargs)
def plot_evaluations(self, **kwargs):
# from skopt.plots
res = self.get_result()
plot_evaluations(res, **kwargs) # plot at every iteration the best point. I want to plot each point.
def plot_1D_with_objective(self, objective_wo_noise=None, noise_level=1e-3):
''' Plotting for 1D functions with known objective. This is for benchmarking.
'''
opt = self
res = opt.get_result()
print("best point:", res.x, res.fun)
# print(res.x_iters)
# print(res.func_vals)
# % Plotting
plot_args = {"objective": objective_wo_noise,
"noise_level": noise_level, "show_legend": True,
"show_title": True, "show_next_point": False,
"show_acq_func": True}
_ = plot_gaussian_process(opt.get_result(), objective=objective_wo_noise,
noise_level=noise_level,
show_next_point=False,
show_acq_func=True) # **plot_args
plt.show()
if 1:
plt.figure()
_ = plot_convergence( opt.get_result() ) # plot at every iteration the best point. I want to plot each point.
plt.figure()
_ = plot_evaluations(opt.get_result())
plt.figure()
_ = plot_objective(opt.get_result()) # plots the gpr function (add error)
# plot poits vs iteration
plt.figure()
plt.plot(opt.yi,'.') #opt.Xi,
plt.xlabel("Iteration")
plt.ylabel("Function Value")
plt.show()
def save(self, file = 'hloop.pkl'):
with open(file, 'wb') as f:
pickle.dump(self, f)
def load(self, file):
with open(file, 'rb') as f:
hloop_restored = pickle.load(f)
return hloop_restored
def write_to_file(self, file_path='optimizer_config.json'):
config = {
'parameters': {
'change_flag': self.change_flag,
'length_scale': self.length_scale,
'length_scale_bounds': self.length_scale_bounds,
'dimensions': self.dimensions,
'noise_level': self.noise_level,
'noise_level_bounds': self.noise_level_bounds,
'sigma_value':self.sigma_value,
'sigma_value_bounds':self.sigma_value_bounds,
'alpha': self.alpha,
'kappa':self.kappa,
'xi':self.xi,
'switch':self.switch
},
'data': {
'x_list': self.Xi,
'y_list': self.yi
}
}
with open(file_path, 'w') as file:
json.dump(config, file, indent=4)
def update_file(self, file_path='optimizer_config.json'):
self.write_to_file(file_path)
def check_file_format(self, file_path='optimizer_config.json'):
try:
with open(file_path, 'r') as file:
data = json.load(file)
return True
except Exception as e:
print(f"Error in file format: {e}")
return False
def load_optimizer_from_file(self, file_path='optimizer_config.json'):
with open(file_path, 'r') as file:
config = json.load(file)
opt = GPR_Optimizer(
dimensions=config['parameters']['dimensions'],
length_scale=config['parameters']['length_scale'],
length_scale_bounds=config['parameters']['length_scale_bounds'],
noise_level=config['parameters']['noise_level'],
noise_level_bounds=config['parameters']['noise_level_bounds'],
sigma_value=config['parameters']['sigma_value'],
sigma_value_bounds= config['parameters']['sigma_value_bounds'],
alpha=config['parameters']['alpha'],
kappa=config['parameters']['kappa'],
xi=config['parameters']['xi'],
switch=config['parameters']['switch']
)
opt.Xi=config['data']['x_list']
opt.yi=config['data']['y_list']
return opt
def plot_cross_section(self, xpoint = None ):
''' Plot the cross-sections through the best measured point. Still working on this.
Plotobjective plots the partial dependence. Somtimes I just want to see a line for each parameter
through the best point.
xpoint = point to plot cross-section, in the unit of dimensions.
Later make it possible to use best measured or best predicted.
'''
# Get the latest gpr model
res = self.get_result()
gpr_model = res.models[-1]
# Create a list of bounds for each dimension
bounds_list = [dim.bounds for dim in self.space.dimensions]
num_params = len(bounds_list)
bounds_list_norm = [(0.0, 1.0)]*num_params # because predict(x) accepts 0 to 1
# I NEED TO HERE CONVERT BEST POINT TO 0 to 1 for each dimension.
# Then bounds_list should just be 0 to 1 for all
if xpoint is None:
if best_type=='measured':
x = res.x # Best point found by the optimizer [x0, x1, ....]
elif best_type=='model':
x_exp, y_exp = self.expected_minimum() # to use minimium of the model, not measured min
else:
x = xpoint # manually enter point
# Convert x to normalized 0 to 1 coordinates, because predict(x) accepts that
x_norm = [(value - min_bound) / (max_bound - min_bound) for value, (min_bound, max_bound) in zip(x, bounds_list)]
for i, bounds in enumerate(bounds_list_norm):
# Create a grid of values for the current hyperparameter
#x_grid = np.linspace(bounds[0], bounds[1], num=100) # gpr.predict(x) requires x from 0 to 1
x_grid = np.linspace(0, 1, num=100) # range has to be 0 to 1 for every parameter
# Prepare the input for the model
x_input = np.tile(x_norm, (len(x_grid), 1)) # Create a matrix where each row is the point x
x_input[:, i] = x_grid # Replace the ith column with the grid values
# Predict using the GPR model
y_pred, sigma = gpr_model.predict(x_input, return_std=True) # gpr.predict(x) requires x from 0 to 1
# Plotting
plt.plot(x_grid, y_pred, label=f'variable {i + 1}')
plt.fill_between(x_grid, y_pred - sigma, y_pred + sigma, alpha=0.2)
plt.xlabel('Parameter (normalized to (0,1)) ')
plt.ylabel('GPR model estimate')
plt.title('Cross-sections through the best point')
plt.legend()
plt.show()
def plot_2D_cross_sections(self):
"""
Plot 2D cross-sections through the optimization space.
This function generates 2D cross-section plots through the best point found by the optimizer.
If the optimization space consists of only two parameters, a contour plot of the entire space is shown.
For more than two parameters, the function plots cross-sections for each pair of parameters,
holding the other parameters at their values in the best point.
"""
figures=[]
# Get the latest gpr model
res = self.get_result()
gpr_model = res.models[-1]
x_best = res.x
bounds_list = [dim.bounds for dim in self.space.dimensions]
# Function to normalize a point
def normalize_point(point, bounds_list):
return [(x - low) / (high - low) for x, (low, high) in zip(point, bounds_list)]
# Normalize the best point
x_best_norm = normalize_point(x_best, bounds_list)
# Create the plots
num_params = len(x_best)
if num_params == 2:
# Just plot the entire space for 2 parameters
x_grid, y_grid = np.meshgrid(
np.linspace(0, 1, 100),
np.linspace(0, 1, 100)
)
xy_grid = np.vstack([x_grid.ravel(), y_grid.ravel()]).T
predictions, sigma = gpr_model.predict(xy_grid, return_std=True)
# Reshape for plotting
predictions = predictions.reshape(x_grid.shape)
sigma = sigma.reshape(x_grid.shape)
# Plot
plt.figure()
cp = plt.contourf(x_grid, y_grid, predictions, alpha=0.7)
plt.colorbar(cp)
plt.title('2D Cross-section of the entire space')
plt.xlabel('Parameter 1')
plt.ylabel('Parameter 2')
plt.show()
else:
# Plot 2D cross-sections for each pair of parameters, holding other parameters at best
for i in range(num_params):
for j in range(i+1, num_params):
x_grid, y_grid = np.meshgrid(
np.linspace(0, 1, 100),
np.linspace(0, 1, 100)
)
# Create a grid for predictions
grid = np.tile(x_best_norm, (100 * 100, 1))
grid[:, i] = x_grid.ravel()
grid[:, j] = y_grid.ravel()
# Make predictions
predictions, sigma = gpr_model.predict(grid, return_std=True)
# Reshape for plotting
predictions = predictions.reshape(x_grid.shape)
sigma = sigma.reshape(x_grid.shape)
# Plot
plt.figure()
cp = plt.contourf(x_grid, y_grid, predictions, alpha=0.7)
plt.colorbar(cp)
plt.title(f'2D Cross-section between Parameter {i+1} and Parameter {j+1}')
plt.xlabel(f'Parameter {i+1}')
plt.ylabel(f'Parameter {j+1}')
plt.show()
return figures
def save(self, file = 'hloop.pkl'):
with open(file, 'wb') as f:
pickle.dump(self, f)
def load(self, file):
with open(file, 'rb') as f:
hloop_restored = pickle.load(f)
return hloop_restored
def write_to_file(self, config_path='optimizer_config.json'):
'''
Write json file with gpr parameters that can be adjusted.
'''
config = {
'parameters': {
'length_scale': self.length_scale,
'length_scale_bounds': self.length_scale_bounds,
'dimensions': self.dimensions,
'noise_level': self.noise_level,
'noise_level_bounds': self.noise_level_bounds,
'sigma_value':self.sigma_value,
'sigma_value_bounds':self.sigma_value_bounds,
'alpha': self.alpha,
'kappa':self.kappa,
'xi':self.xi
},
'data': {
'x_list': self.Xi,
'y_list': self.yi
}
}
with open(config_path, 'w') as file:
json.dump(config, file, indent=4)
def update_file(self, config_path='optimizer_config.json'):
''' Why do we need write_to_file and update_file? - Jon'''
self.write_to_file(config_path)
def check_file_format(self, config_path='optimizer_config.json'):
'''Checks if the JSON file can be read. '''
try:
with open(config_path, 'r') as file:
data = json.load(file)
return True
except Exception as e:
print(f"Error in file format: {e}")
return False
def load_optimizer_from_file(self, config_path='optimizer_config.json'):
"""
Loads a GPR (Gaussian Process Regression) Optimizer configuration from a JSON file and initializes the optimizer with these settings.
This function reads a JSON configuration file that specifies the parameters for a GPR Optimizer. The JSON file should contain two main sections: 'parameters', which includes all the necessary parameters for the GPR Optimizer, and 'data', which includes the data points (x_list and y_list) to be used by the optimizer.
Parameters:
- config_path (str): The path to the JSON configuration file. Defaults to 'optimizer_config.json'.
Returns:
- opt: An instance of the GPR_Optimizer class, initialized with the parameters and data from the configuration file.
Example usage:
optimizer = MyClass() # Assuming this method is part of MyClass
opt = optimizer.load_optimizer_from_file("path/to/config.json")
Note:
- The function prints "loading config..." to the console once the configuration is successfully loaded.
- Ensure the JSON file exists at the specified path and adheres to the expected format.
"""
with open(config_path, 'r') as file:
config = json.load(file)
# Also allow for domain reduction here.
#If domain_reduction changed, then define best_point with self.expected_minimium or Xi[argmin(self.y)],
# then define dimensios centered around that. around that.
# take the dimensions config['parameters']['dimensions'], then find new dimensions.
opt = GPR_Optimizer(
dimensions=config['parameters']['dimensions'],
length_scale=config['parameters']['length_scale'],
length_scale_bounds=config['parameters']['length_scale_bounds'],
noise_level=config['parameters']['noise_level'],
noise_level_bounds=config['parameters']['noise_level_bounds'],
sigma_value=config['parameters']['sigma_value'],
sigma_value_bounds= config['parameters']['sigma_value_bounds'],
alpha=config['parameters']['alpha'],
kappa=config['parameters']['kappa'],
xi=config['parameters']['xi'],
n_initial_points=1, # turn off randomly sampled initial points. Test it being 0.
acq_func = self.acq_func,
normalize_y = self.normalize_y,
n_restarts_optimizer = self.n_restarts_optimizer,
acq_optimizer = self.acq_optimizer,
verbose = self.verbose,
config_path = self.config_path,
domain_reduction = config['parameters']['domain_reduction'],
best_point = [] ,
hp_fit_n_points = self.hp_fit_n_points
)
# Reload the data points
opt.Xi=config['data']['x_list']
opt.yi=config['data']['y_list']
print("loading config...")
# Filter out Xi, yi that are outside new bounds. Also need to do for domain_reduction
# if config['parameters']['dimensions'] is not the same as self.dimensions,
# then filter out Xi,yi data. Will want to keep data. opt.Xi_all and opt.Yi_all.
# What if I use tell( list) here, will that naturally do the filtering?
return opt
######### BENCHMARKING ###############################
def calculate_distances(points, single_point):
"""
Calculate the Euclidean distances from a list of points to a single point.
:param points: List of points (each point is a list of coordinates).
:param single_point: A single point (list of coordinates).
:return: List of distances.
"""
# Convert lists to numpy arrays for efficient computation
points_array = np.array(points)
single_point_array = np.array(single_point)
diff = points_array - single_point_array # Calculate the differences and square them
squared_diff = diff ** 2
distances = np.sqrt(np.sum(squared_diff, axis=1)) # Sum the squared differences and take the square root
return distances
def multivariate_gaussian_signal(x, ndims=1, amplitude=1, sigma=0.1, noisetype=None, background_noise=None, N_gaussian = 1):
'''Simulates noisy data using multivariate distribution with covariance matrix.
Gives single peak. Shape of peak is given by covariance matrix.
For noise,
shot noise (poisson distribution),
gaussian noise proportional to sqrt(signal)/ N_gaussian. For example from adding several poissoning signals together.
constant gaussian noise with amplitude 'background noise'
Parameters :
x : list of points
ndims : 1-5, Number of dimensions
amplitude : amplitude of the gaussian, so max value.
sigma = 0.1 # standard deviation, FWHM = 2.35*sigma
noisetype : None / 'shot' / 'gaussian'
background_noise : None, or 1. constant gaussian noise amplitude.
N_gaussian : 10, if noisetype = 'gaussian', number of averages.
Output :
negative float. Neg so works with minimizer.
'''
# 1D Gaussian objective, shot noise
if 1:
# Peak at center with no covariance
mean = np.zeros(ndims) # Assuming the mean vector is zero
covariance_matrix = np.diag(sigma**2 * np.ones(ndims))
elif 0:
# Randomize means and covariance
mean = np.random.randn(ndims) # Assuming the mean vector is zero
A = np.random.randn(ndims, ndims)
covariance_matrix = sigma**2 * np.dot(A, A.T) # to make a positive semi-definite matrix, which is what a covariance matrix must be.
normalizing_factor = 1 / np.sqrt((2 * np.pi) ** ndims * np.linalg.det(covariance_matrix)) # normalization to make max 1
val = multivariate_normal(mean, covariance_matrix).pdf(x) / normalizing_factor
val = amplitude * val
if noisetype=='shot':
# Generate random number from Poisson distribution with mean of 'val'
n = 1 # Number of points to generate
mu = np.abs(val) # Mean of the Poisson distribution
val = np.random.poisson(mu, n)[0]
elif noisetype=='gaussian':
# Gaussian noise proportional to sqrt(signal)/N
# This for example would model the average of N shot noise signals, each with a max signal of amplitude.
val = val + np.sqrt(np.abs(val))/ N_gaussian * np.random.randn()
if background_noise is not None:
# Constant gaussian noise
val = val + background_noise * np.random.randn()
val = np.abs(val)
return -val # because minimizing
def ci_wilson(self, k, n, z=1.96, eqmode=0):
''' Calculates the Binomial Proportion Confidence Interval using the Wilson Score method without continuation correction
# Equations eqmode == 1 from: https://en.wikipedia.org/w/index.php?title=Binomial_proportion_confidence_interval&oldid=1101942017#Wilson_score_interval
# Equations eqmode == 0 from: https://www.evanmiller.org/how-not-to-sort-by-average-rating.html
# The results should be close to:
# from statsmodels.stats.proportion import proportion_confint
# proportion_confint(k, n, alpha=0.05, method='wilson')
#z=1.44 = 85%, 1.96 = 95% '''
if n == 0:
return 0
p_hat = float(k) / n #mean
z2 = z**2
if eqmode == 0:
ci_l = (p_hat + z2/(2*n) - z*np.sqrt(max(0.0, (p_hat*(1 - p_hat) + z2/(4*n))/n))) / (1 + z2 / n)
else:
ci_l = (1.0 / (1.0 + z2/n)) * (p_hat + z2/(2*n)) - (z / (1 + z2/n)) * np.sqrt(max(0.0, (p_hat*(1 - p_hat)/n + z2/(4*(n**2)))))
if eqmode == 0:
ci_u = (p_hat + z2/(2*n) + z*np.sqrt(max(0.0, (p_hat*(1 - p_hat) + z2/(4*n))/n))) / (1 + z2 / n)
else:
ci_u = (1.0 / (1.0 + z2/n)) * (p_hat + z2/(2*n)) + (z / (1 + z2/n)) * np.sqrt(max(0.0, (p_hat*(1 - p_hat)/n + z2/(4*(n**2)))))
return [ci_l, ci_u]
def binary_simulation(self, mu, Nruns):
''' Simulates binary data with Nruns of either 0 or 1 and mean mu.
mu: between 0 and 1
Nruns: number of runs
# Outputs simulated mean, Wilson 86% confint [lcb, ucb], and the max sigma.'''
data = np.random.binomial(1, mu, Nruns)
mean = np.mean(data)
num_ones = np.count_nonzero(data == 1)
confint = self.ci_wilson(num_ones,data.size, z=1.44) # 1.44 for 86% conf, 1.96 for 95%
sigma = np.max(abs(mean - confint)) # take max of lc and uc bounds
return mean, sigma, confint