diff --git a/src/lasdi/enums.py b/src/lasdi/enums.py index 33b9bd8..ce24936 100644 --- a/src/lasdi/enums.py +++ b/src/lasdi/enums.py @@ -1,13 +1,13 @@ from enum import Enum class NextStep(Enum): - Train = 1 - PickSample = 2 - RunSample = 3 - CollectSample = 4 + Train = 1 + PickSample = 2 + RunSample = 3 + CollectSample = 4 class Result(Enum): - Unexecuted = 1 - Success = 2 - Fail = 3 - Complete = 4 \ No newline at end of file + Unexecuted = 1 + Success = 2 + Fail = 3 + Complete = 4 \ No newline at end of file diff --git a/src/lasdi/gp.py b/src/lasdi/gp.py index 3af98ff..19e7763 100644 --- a/src/lasdi/gp.py +++ b/src/lasdi/gp.py @@ -1,80 +1,200 @@ -import numpy as np -from sklearn.gaussian_process.kernels import ConstantKernel, Matern, RBF -from sklearn.gaussian_process import GaussianProcessRegressor +import numpy as np +from sklearn.gaussian_process.kernels import ConstantKernel, Matern, RBF +from sklearn.gaussian_process import GaussianProcessRegressor -def fit_gps(X, Y): - ''' - Trains each GP given the interpolation dataset. - X: (n_train, n_param) numpy 2d array - Y: (n_train, n_coef) numpy 2d array + + +# ------------------------------------------------------------------------------------------------- +# Gaussian Process functions! +# ------------------------------------------------------------------------------------------------- + +def fit_gps(X : np.ndarray, Y : np.ndarray) -> list[GaussianProcessRegressor]: + """ + Trains a GP for each column of Y. If Y has shape N x k, then we train k GP regressors. In this + case, we assume that X has shape N x M. Thus, the Input to the GP is in \mathbb{R}^M. For each + k, we train a GP where the i'th row of X is the input and the i,k component of Y is the + corresponding target. Thus, we return a list of k GP Regressor objects, the k'th one of which + makes predictions for the k'th coefficient in the latent dynamics. + We assume each target coefficient is independent with each other. - gp_dictionnary is a dataset containing the trained GPs (as sklearn objects) - ''' - n_coef = 1 if (Y.ndim == 1) else Y.shape[1] + + ----------------------------------------------------------------------------------------------- + Arguments + ----------------------------------------------------------------------------------------------- + + X: A 2d numpy array of shape (n_train, input_dim), where n_train is the number of training + examples and input_dim is the number of components in each input (e.g., the number of + parameters) + + Y: A 2d numpy array of shape (n_train, n_coef), where n_train is the number of training + examples and n_coef is the number of coefficients in the latent dynamics. + + + + ----------------------------------------------------------------------------------------------- + Returns + ----------------------------------------------------------------------------------------------- + + A list of trained GP regressor objects. If Y has k columns, then the returned list has k + elements. It's i'th element holds a trained GP regressor object whose training inputs are the + columns of X and whose corresponding target values are the elements of the i'th column of Y. + """ + + # Determine the number of components (columns) of Y. Since this is a regression task, we will + # perform a GP regression fit on each component (column) of Y. + n_coef : int = 1 if (Y.ndim == 1) else Y.shape[1] + + # Transpose Y so that each row corresponds to a particular coefficient. This allows us to + # iterate over the coefficients by iterating through the rows of Y. if (n_coef > 1): Y = Y.T + # Sklearn requires X to be a 2D array... so make sure this holds. if X.ndim == 1: X = X.reshape(-1, 1) - gp_dictionnary = [] + # Initialize a list to hold the trained GP objects. + gp_list : list[GaussianProcessRegressor] = [] + # Cycle through the rows of Y (which we transposed... so this just cycles through the + # coefficients) for yk in Y: + # Make the kernel. # kernel = ConstantKernel() * Matern(length_scale_bounds = (0.01, 1e5), nu = 1.5) - kernel = ConstantKernel() * RBF(length_scale_bounds = (0.1, 1e5)) + kernel = ConstantKernel() * RBF(length_scale_bounds = (0.1, 1e5)) - gp = GaussianProcessRegressor(kernel = kernel, n_restarts_optimizer = 10, random_state = 1) - gp.fit(X, yk) + # Initialize the GP object. + gp = GaussianProcessRegressor(kernel = kernel, n_restarts_optimizer = 10, random_state = 1) - gp_dictionnary += [gp] + # Fit it to the data (train), then add it to the list of trained GPs + gp.fit(X, yk) + gp_list += [gp] - return gp_dictionnary + # All done! + return gp_list -def eval_gp(gp_dictionnary, param_grid): - ''' +def eval_gp(gp_list : list[GaussianProcessRegressor], param_grid : np.ndarray) -> tuple: + """ Computes the GPs predictive mean and standard deviation for points of the parameter space grid - ''' - n_coef = len(gp_dictionnary) + + ----------------------------------------------------------------------------------------------- + Arguments + ----------------------------------------------------------------------------------------------- + + gp_list: a list of trained GP regressor objects. The number of elements in this list should + match the number of columns in param_grid. The i'th element of this list is a GP regressor + object that predicts the i'th coefficient. + + param_grid: A 2d numpy.ndarray object of shape (number of parameter combination, number of + parameters). The i,j element of this array specifies the value of the j'th parameter in the + i'th combination of parameters. We use this as the testing set for the GP evaluation. + + + + ----------------------------------------------------------------------------------------------- + Returns + ----------------------------------------------------------------------------------------------- + + A two element tuple. Both are 2d numpy arrays of shape (number of parameter combinations, + number of coefficients). The two arrays hold the predicted means and std's for each parameter + at each training example, respectively. + + Thus, the i,j element of the first return variable holds the predicted mean of the j'th + coefficient in the latent dynamics at the i'th training example. Likewise, the i,j element of + the second return variable holds the standard deviation in the predicted distribution for the + j'th coefficient in the latent dynamics at the i'th combination of parameter values. + """ + + # Fetch the numbers coefficients. Since there is one GP Regressor per SINDy coefficient, this + # just the length of the gp_list. + n_coef : int = len(gp_list) + + # Fetch the number of parameters, make sure the grid is 2D. if (param_grid.ndim == 1): param_grid = param_grid.reshape(1, -1) n_points = param_grid.shape[0] + # Initialize arrays to hold the mean, STD. pred_mean, pred_std = np.zeros([n_points, n_coef]), np.zeros([n_points, n_coef]) - for k, gp in enumerate(gp_dictionnary): + + # Cycle through the GPs (one for each coefficient in the SINDy coefficients!). + for k, gp in enumerate(gp_list): + # Make predictions using the parameters in the param_grid. pred_mean[:, k], pred_std[:, k] = gp.predict(param_grid, return_std = True) + # All done! return pred_mean, pred_std -def sample_coefs(gp_dictionnary, param, n_samples): - ''' - Generates sample sets of ODEs for one given parameter. - coef_samples is a list of length n_samples, where each terms is a matrix of SINDy coefficients sampled from the GP predictive - distributions +def sample_coefs( gp_list : list[GaussianProcessRegressor], + param : np.ndarray, + n_samples : int): + """ + Generates sets of ODE (SINDy) coefficients sampled from the predictive distribution for those + coefficients at the specified parameter value (parma). Specifically, for the k'th SINDy + coefficient, we draw n_samples samples of the predictive distribution for the k'th coefficient + when param is the parameter. + + + + ----------------------------------------------------------------------------------------------- + Arguments + ----------------------------------------------------------------------------------------------- + + gp_list: a list of trained GP regressor objects. The number of elements in this list should + match the number of columns in param_grid. The i'th element of this list is a GP regressor + object that predicts the i'th coefficient. + + param: A combination of parameter values. i.e., a single test example. We evaluate each GP in + the gp_list at this parameter value (getting a prediction for each coefficient). + + n_samples: Number of samples of the predicted latent dynamics used to build ensemble of fom + predictions. N_s in the paper. + + + + ----------------------------------------------------------------------------------------------- + Returns + ----------------------------------------------------------------------------------------------- + + A 2d numpy ndarray object called coef_samples. It has shape (n_samples, n_coef), where n_coef + is the number of coefficients (length of gp_list). The i,j element of this list is the i'th + sample of the j'th SINDy coefficient. + """ - ''' + # Fetch the number of coefficients (since there is one GP Regressor per coefficient, this is + # just the length of the gp_list). + n_coef : int = len(gp_list) - n_coef = len(gp_dictionnary) - coef_samples = np.zeros([n_samples, n_coef]) + # Initialize an array to hold the coefficient samples. + coef_samples : np.ndarray = np.zeros([n_samples, n_coef]) + # Make sure param is a 2d array with one row, we need this when evaluating the GP Regressor + # object. if param.ndim == 1: param = param.reshape(1, -1) - n_points = param.shape[0] + + # Make sure we only have a single sample. + n_points : int = param.shape[0] assert(n_points == 1) - pred_mean, pred_std = eval_gp(gp_dictionnary, param) + # Evaluate the predicted mean and std at the parameter value. + pred_mean, pred_std = eval_gp(gp_list, param) pred_mean, pred_std = pred_mean[0], pred_std[0] + # Cycle through the samples and coefficients. For each sample of the k'th coefficient, we draw + # a sample from the normal distribution with mean pred_mean[k] and std pred_std[k]. for s in range(n_samples): for k in range(n_coef): coef_samples[s, k] = np.random.normal(pred_mean[k], pred_std[k]) + # All done! return coef_samples \ No newline at end of file diff --git a/src/lasdi/gplasdi.py b/src/lasdi/gplasdi.py index ac3d046..5131f24 100644 --- a/src/lasdi/gplasdi.py +++ b/src/lasdi/gplasdi.py @@ -138,7 +138,7 @@ def __init__(self, physics, autoencoder, latent_dynamics, param_space, config): else: self.device = 'cpu' - self.best_loss = np.Inf + self.best_loss = np.inf self.best_coefs = None self.restart_iter = 0 diff --git a/src/lasdi/inputs.py b/src/lasdi/inputs.py index 0a654b6..fa030bb 100644 --- a/src/lasdi/inputs.py +++ b/src/lasdi/inputs.py @@ -1,67 +1,159 @@ +# ------------------------------------------------------------------------------------------------- +# Imports +# ------------------------------------------------------------------------------------------------- + from warnings import warn -verbose = False +verbose : bool = False + + +# ------------------------------------------------------------------------------------------------- +# Input Parser class +# ------------------------------------------------------------------------------------------------- class InputParser: - dict_ = None - name = "" + """ + A InputParser objects acts as a wrapper around a dictionary of settings. Thus, each setting is + a key and the corresponding value is the setting's value. Because one setting may itself be + a dictionary (we often group settings; each group has a name but several constituent settings), + the underlying dictionary is structured as a sequence of nested dictionaries. This class allows + the user to select a specific setting from that structure by specifying (via a list of strings) + where in that nested structure the desired setting lives. + """ + dict_ : dict = None + name : str = "" + + + + def __init__(self, dict : dict, name : str = "") -> None: + """" + Initializes an InputParser object by setting the underlying dictionary of settings as an + attribute. + + ------------------------------------------------------------------------------------------- + Arguments + ------------------------------------------------------------------------------------------- + + dict: The dictionary of settings. To avoid the risk of the user accidentally changing one + of the settings after wrapping it, we store a deep copy of dict in self. + """ - def __init__(self, dict, name = ""): + # A shallow copy could cause issues if the user changes dict's keys/values after + # initializing this object. We store a deep copy to avoid this risk. from copy import deepcopy self.dict_ = deepcopy(dict) + self.name = name - return - def getInput(self, keys, fallback=None, datatype=None): + + + def getInput(self, keys : list, fallback = None, datatype = None): ''' - Find the value corresponding to the list of keys. - If the specified keys do not exist, use the fallback value. - If the fallback value does not exist, returns an error. - If the datatype is specified, enforce the output value has the right datatype. + A InputParser object acts as a wrapper around a dictionary of settings. That is, self.dict_ + is structured as a nested family of dictionaries. Each setting corresponds to a key in + self.dict_. The setting's value is the corresponding value in self.dict_. In many cases, + a particular setting may be nested within others. That is, a setting's value may itself be + another dictionary housing various sub-settings. This function allows us to fetch a + specific setting from this nested structure. + + Specifically, we specify a list of strings. keys[0] should be a key in self.dict_ + If so, we set val = self.dict_[keys[0]]. If there are more keys, then val should be a + dictionary and keys[1] should be a key in this dictionary. In this case, we replace val + with val[key[1]] and so on. This continues until we have exhausted all keys. There is one + important exception: + + If at some point in the process, there are more keys but val is not a dictionary, or if + there are more keys and val is a dictionary but the next key is not a key in that + dictionary, then we return the fallback value. If the fallback value does not exist, + returns an error. + + + + ------------------------------------------------------------------------------------------- + Arguments + ------------------------------------------------------------------------------------------- + + keys: A list of keys we want to fetch from self.dict. keys[0] should be a key in self.dict_ + If so, we set val = self.dict_[keys[0]]. If there are more keys, then val should be a + dictionary and keys[1] should be a key in this dictionary. In this case, we replace val + with val[key[1]] and so on. This continues until we have exhausted all keys. + + fallback: A sort of default value. If at some point, val is not a dictionary (and there are + more keys) or val is a dictionary but the next key is not a valid key in that dictionary, + then we return the fallback value. + + datatype: If not None, then we require that the final val has this datatype. If the final + val does not have the desired datatype, we raise an exception. + + + + ------------------------------------------------------------------------------------------- + Returns + ------------------------------------------------------------------------------------------- + + The final val value as outlined by the process described above. ''' + + # Concatenate the keys together. This is for debugging purposes. keyString = "" for key_ in keys: keyString += key_ + "/" + # Begin by initializing val to self.dict_ val = self.dict_ + + # Cycle through the keys for key in keys: + + # Check if the current key is a key in val (this assumes val is a dictionary). If so, + # update val. Otherwise, return the fallback (if it is present) or raise an exception. if key in val: val = val[key] elif (fallback != None): return fallback else: raise RuntimeError("%s does not exist in the input dictionary %s!" % (keyString, self.name)) - + + # Check if the fallback and final val have the same type. if (fallback != None): if (type(val) != type(fallback)): warn("%s does not match the type with the fallback value %s!" % (str(type(val)), str(type(fallback)))) - + + # Check thast the final val matches the desired datatype if (datatype != None): if (type(val) != datatype): raise RuntimeError("%s does not match the specified datatype %s!" % (str(type(val)), str(datatype))) else: if verbose: warn("InputParser Warning: datatype is not checked.\n key: %s\n value type: %s" % (keys, type(val))) + + # All done! return val -def getDictFromList(list_, inputDict): - ''' - get a dict with {key: val} from a list of dicts - NOTE: it returns only the first item in the list, - even if the list has more than one dict with {key: val}. - ''' - dict_ = None - for item in list_: - isDict = True - for key, val in inputDict.items(): - if key not in item: - isDict = False - break - if (item[key] != val): - isDict = False - break - if (isDict): - dict_ = item - break - if (dict_ == None): - raise RuntimeError('Given list does not have a dict with {%s: %s}!' % (key, val)) - return dict_ + + +# ------------------------------------------------------------------------------------------------- +# Unused: getDictFromList function +# ------------------------------------------------------------------------------------------------- + +#def getDictFromList(list_, inputDict): +# ''' +# get a dict with {key: val} from a list of dicts +# NOTE: it returns only the first item in the list, +# even if the list has more than one dict with {key: val}. +# ''' +# dict_ = None +# for item in list_: +# isDict = True +# for key, val in inputDict.items(): +# if key not in item: +# isDict = False +# break +# if (item[key] != val): +# isDict = False +# break +# if (isDict): +# dict_ = item +# break +# if (dict_ == None): +# raise RuntimeError('Given list does not have a dict with {%s: %s}!' % (key, val)) +# return dict_ \ No newline at end of file diff --git a/src/lasdi/latent_space.py b/src/lasdi/latent_space.py index fdf8fef..2e78542 100644 --- a/src/lasdi/latent_space.py +++ b/src/lasdi/latent_space.py @@ -1,175 +1,481 @@ -import torch -import numpy as np +# ------------------------------------------------------------------------------------------------- +# Imports and setup +# ------------------------------------------------------------------------------------------------- + +import torch +import numpy as np + +from .physics import Physics # activation dict -act_dict = {'ELU': torch.nn.ELU, - 'hardshrink': torch.nn.Hardshrink, - 'hardsigmoid': torch.nn.Hardsigmoid, - 'hardtanh': torch.nn.Hardtanh, - 'hardswish': torch.nn.Hardswish, - 'leakyReLU': torch.nn.LeakyReLU, - 'logsigmoid': torch.nn.LogSigmoid, - 'multihead': torch.nn.MultiheadAttention, - 'PReLU': torch.nn.PReLU, - 'ReLU': torch.nn.ReLU, - 'ReLU6': torch.nn.ReLU6, - 'RReLU': torch.nn.RReLU, - 'SELU': torch.nn.SELU, - 'CELU': torch.nn.CELU, - 'GELU': torch.nn.GELU, - 'sigmoid': torch.nn.Sigmoid, - 'SiLU': torch.nn.SiLU, - 'mish': torch.nn.Mish, - 'softplus': torch.nn.Softplus, - 'softshrink': torch.nn.Softshrink, - 'tanh': torch.nn.Tanh, - 'tanhshrink': torch.nn.Tanhshrink, - 'threshold': torch.nn.Threshold, - } - -def initial_condition_latent(param_grid, physics, autoencoder): - - ''' - - Outputs the initial condition in the latent space: Z0 = encoder(U0) - - ''' - - n_param = param_grid.shape[0] - Z0 = [] - - sol_shape = [1, 1] + physics.qgrid_size +act_dict = {'ELU' : torch.nn.ELU, + 'hardshrink' : torch.nn.Hardshrink, + 'hardsigmoid' : torch.nn.Hardsigmoid, + 'hardtanh' : torch.nn.Hardtanh, + 'hardswish' : torch.nn.Hardswish, + 'leakyReLU' : torch.nn.LeakyReLU, + 'logsigmoid' : torch.nn.LogSigmoid, + 'PReLU' : torch.nn.PReLU, + 'ReLU' : torch.nn.ReLU, + 'ReLU6' : torch.nn.ReLU6, + 'RReLU' : torch.nn.RReLU, + 'SELU' : torch.nn.SELU, + 'CELU' : torch.nn.CELU, + 'GELU' : torch.nn.GELU, + 'sigmoid' : torch.nn.Sigmoid, + 'SiLU' : torch.nn.SiLU, + 'mish' : torch.nn.Mish, + 'softplus' : torch.nn.Softplus, + 'softshrink' : torch.nn.Softshrink, + 'tanh' : torch.nn.Tanh, + 'tanhshrink' : torch.nn.Tanhshrink, + 'threshold' : torch.nn.Threshold,} + + + +# ------------------------------------------------------------------------------------------------- +# initial_conditions_latent function +# ------------------------------------------------------------------------------------------------- + +def initial_condition_latent(param_grid : np.ndarray, + physics : Physics, + autoencoder : torch.nn.Module) -> list[np.ndarray]: + """ + This function maps a set of initial conditions for the fom to initial conditions for the + latent space dynamics. Specifically, we take in a set of possible parameter values. For each + set of parameter values, we recover the fom IC (from physics), then map this fom IC to a + latent space IC (by encoding it using the autoencoder). We do this for each parameter + combination and then return a list housing the latent space ICs. + + + ----------------------------------------------------------------------------------------------- + Arguments + ----------------------------------------------------------------------------------------------- + + param_grid: A 2d numpy.ndarray object of shape (number of parameter combination) x (number of + parameters). + + physics: A "Physics" object that stores the datasets for each parameter combination. + + autoencoder: The actual autoencoder object that we use to map the ICs into the latent space. + + + ----------------------------------------------------------------------------------------------- + Returns + ----------------------------------------------------------------------------------------------- + + A list of numpy ndarray objects whose i'th element holds the latent space initial condition + for the i'th set of parameters in the param_grid. That is, if we let U0_i denote the fom IC for + the i'th set of parameters, then the i'th element of the returned list is Z0_i = encoder(U0_i). + """ + # Figure out how many parameters we are testing at. + n_param : int = param_grid.shape[0] + Z0 : list[np.ndarray] = [] + sol_shape : list[int] = [1, 1] + physics.qgrid_size + + # Cycle through the parameters. for i in range(n_param): - u0 = physics.initial_condition(param_grid[i]) + # TODO(kevin): generalize parameter class. + + # Fetch the IC for the i'th set of parameters. Then map it to a tensor. + u0 : np.ndarray = physics.initial_condition(param_grid[i]) u0 = u0.reshape(sol_shape) u0 = torch.Tensor(u0) + + # Encode the IC, then map the encoding to a numpy array. z0 = autoencoder.encoder(u0) z0 = z0[0, 0, :].detach().numpy() + + # Append the new IC to the list of latent ICs Z0.append(z0) + # Return the list of latent ICs. return Z0 + + +# ------------------------------------------------------------------------------------------------- +# MLP class +# ------------------------------------------------------------------------------------------------- + class MultiLayerPerceptron(torch.nn.Module): + def __init__( self, + layer_sizes : list[int], + act_type : str = 'sigmoid', + reshape_index : int = None, + reshape_shape : tuple[int] = None, + threshold : float = 0.1, + value : float = 0.0) -> None: + """ + This class defines a standard multi-layer network network. - def __init__(self, layer_sizes, - act_type='sigmoid', reshape_index=None, reshape_shape=None, - threshold=0.1, value=0.0, num_heads=1): - super(MultiLayerPerceptron, self).__init__() - # including input, hidden, output layers - self.n_layers = len(layer_sizes) - self.layer_sizes = layer_sizes + ------------------------------------------------------------------------------------------- + Arguments + ------------------------------------------------------------------------------------------- - # Linear features between layers - self.fcs = [] - for k in range(self.n_layers-1): - self.fcs += [torch.nn.Linear(layer_sizes[k], layer_sizes[k + 1])] - self.fcs = torch.nn.ModuleList(self.fcs) - self.init_weight() + layer_sizes: A list of integers specifying the widths of the layers (including the + dimensionality of the domain of each layer, as well as the co-domain of the final layer). + Suppose this list has N elements. Then the network will have N - 1 layers. The i'th layer + maps from \mathbb{R}^{layer_sizes[i]} to \mathbb{R}^{layers_sizes[i]}. Thus, the i'th + element of this list represents the domain of the i'th layer AND the co-domain of the + i-1'th layer. + + act_type: A string specifying which activation function we want to use at the end of each + layer (except the final one). We use the same activation for each layer. + + reshape_index: This argument specifies if we should reshape the network's input or output + (or neither). If the user specifies reshape_index, then it must be either 0 or -1. Further, + in this case, they must also specify reshape_shape (you need to specify both together). If + it is 0, then reshape_shape specifies how we reshape the input before passing it through + the network (the input to the first layer). If reshape_index is -1, then reshape_shape + specifies how we reshape the network output before returning it (the output to the last + layer). + + reshape_shape: These are lists of k integers specifying the final k dimensions of the shape + of the input to the first layer (if reshape_index == 0) or the output of the last layer + (if reshape_index == -1). You must specify this argument if and only if you specify + reshape_index. + + threshold: You should only specify this argument if you are using the "Threshold" + activation type. In this case, it specifies the threshold value for that activation (if x + is the input and x > threshold, then the function returns x. Otherwise, it returns the + value). - # Reshape input or output layer + value: You should only specify this argument if you are using the "Threshold" activation + type. In this case, "value" specifies the value that the function returns if the input is + below the threshold. + + + ------------------------------------------------------------------------------------------- + Returns + ------------------------------------------------------------------------------------------- + + Nothing! + """ + + # Run checks. + # Make sure the reshape index is either 0 (input to 1st layer) or -1 (output of last + # layer). Also make sure that that product of the dimensions in the reshape_shape match + # the input dimension for the 1st layer (if reshape_index == 0) or the output dimension of + # the last layer (if reshape_index == 1). + # + # Why do we need the later condition? Well, suppose that reshape_shape has a length of k. + # If reshape_index == 0, then we squeeze the final k dimensions of the input and feed that + # into the first layer. Thus, in this case, we need the last dimension of the squeezed + # array to match the input domain of the first layer. On the other hand, reshape_index == -1 + # then we reshape the final dimension of the output to match the reshape_shape. Thus, in + # both cases, we need the product of the components of reshape_shape to match a + # corresponding element of layer_sizes. assert((reshape_index is None) or (reshape_index in [0, -1])) assert((reshape_shape is None) or (np.prod(reshape_shape) == layer_sizes[reshape_index])) - self.reshape_index = reshape_index - self.reshape_shape = reshape_shape - # Initalize activation function - self.act_type = act_type - self.use_multihead = False + super(MultiLayerPerceptron, self).__init__() + + # Note that layer_sizes specifies the dimensionality of the domains and co-domains of each + # layer. Specifically, the i'th element specifies the input dimension of the i'th layer, + # while the final element specifies the dimensionality of the co-domain of the final layer. + # Thus, the number of layers is one less than the length of layer_sizes. + self.n_layers : int = len(layer_sizes) - 1; + self.layer_sizes : list[int] = layer_sizes + + # Set up the affine parts of the layers. + self.layers : list[torch.nn.Module] = []; + for k in range(self.n_layers): + self.layers += [torch.nn.Linear(layer_sizes[k], layer_sizes[k + 1])] + self.layers = torch.nn.ModuleList(self.layers) + + # Now, initialize the weight matrices and bias vectors in the affine portion of each layer. + self.init_weight() + + # Reshape input to the 1st layer or output of the last layer. + self.reshape_index : int = reshape_index + self.reshape_shape : list[int] = reshape_shape + + # Set up the activation function. Note that we need to handle the threshold activation type + # separately because it requires an extra set of parameters to initialize. + self.act_type : str = act_type if act_type == "threshold": - self.act = act_dict[act_type](threshold, value) - - elif act_type == "multihead": - self.use_multihead = True - if (self.n_layers > 3): # if you have more than one hidden layer - self.act = [] - for i in range(self.n_layers-2): - self.act += [act_dict[act_type](layer_sizes[i+1], num_heads)] - else: - self.act = [torch.nn.Identity()] # No additional activation - self.act = torch.nn.ModuleList(self.fcs) - - #all other activation functions initialized here + self.act : torch.nn.Module = act_dict[act_type](threshold, value) else: - self.act = act_dict[act_type]() + self.act : torch.nn.Module = act_dict[act_type]() + + # All done! return - def forward(self, x): + + + def forward(self, x : torch.Tensor) -> torch.Tensor: + """ + This function defines the forward pass through self. + + + ------------------------------------------------------------------------------------------- + Arguments + ------------------------------------------------------------------------------------------- + + x: A tensor holding a batch of inputs. We pass this tensor through the network's layers + and then return the result. If self.reshape_index == 0 and self.reshape_shape has k + elements, then the final k elements of x's shape must match self.reshape_shape. + + + ------------------------------------------------------------------------------------------- + Returns + ------------------------------------------------------------------------------------------- + + The image of x under the network's layers. If self.reshape_index == -1 and + self.reshape_shape has k elements, then we reshape the output so that the final k elements + of its shape match those of self.reshape_shape. + """ + + # If the reshape_index is 0, we need to reshape x before passing it through the first + # layer. if (self.reshape_index == 0): - # make sure the input has a proper shape + # Make sure the input has a proper shape. There is a lot going on in this line; let's + # break it down. If reshape_index == 0, then we need to reshape the input, x, before + # passing it through the layers. Let's assume that reshape_shape has k elements. Then, + # we need to squeeze the final k dimensions of the input, x, so that the resulting + # tensor has a final dimension size that matches the input dimension size for the first + # layer. The check below makes sure that the final k dimensions of the input, x, match + # the stored reshape_shape. assert(list(x.shape[-len(self.reshape_shape):]) == self.reshape_shape) - # we use torch.Tensor.view instead of torch.Tensor.reshape, - # in order to avoid data copying. + + # Now that we know the final k dimensions of x have the correct shape, let's squeeze + # them into 1 dimension (so that we can pass the squeezed tensor through the first + # layer). To do this, we reshape x by keeping all but the last k dimensions of x, and + # replacing the last k with a single dimension whose size matches the dimensionality of + # the domain of the first layer. Note that we use torch.Tensor.view instead of + # torch.Tensor.reshape in order to avoid data copying. x = x.view(list(x.shape[:-len(self.reshape_shape)]) + [self.layer_sizes[self.reshape_index]]) - for i in range(self.n_layers-2): - x = self.fcs[i](x) # apply linear layer - if (self.use_multihead): - x = self.apply_attention(self, x, i) - else: - x = self.act(x) + # Pass x through the network layers (except for the final one, which has no activation + # function). + for i in range(self.n_layers - 1): + x : torch.Tensor = self.layers[i](x) # apply linear layer + x : torch.Tensor = self.act(x) # apply activation - x = self.fcs[-1](x) + # Apply the final (output) layer. + x = self.layers[-1](x) + # If the reshape_index is -1, then we need to reshape the output before returning. if (self.reshape_index == -1): - # we use torch.Tensor.view instead of torch.Tensor.reshape, - # in order to avoid data copying. + # In this case, we need to split the last dimension of x, the output of the final + # layer, to match the reshape_shape. This is precisely what the line below does. Note + # that we use torch.Tensor.view instead of torch.Tensor.reshape in order to avoid data + # copying. x = x.view(list(x.shape[:-1]) + self.reshape_shape) + # All done! return x - def apply_attention(self, x, act_idx): - x = x.unsqueeze(1) # Add sequence dimension for attention - x, _ = self.act[act_idx](x, x, x) # apply attention - x = x.squeeze(1) # Remove sequence dimension - return x - - def init_weight(self): + + + def init_weight(self) -> None: + """ + This function initializes the weight matrices and bias vectors in self's layers. + + + ------------------------------------------------------------------------------------------- + Arguments + ------------------------------------------------------------------------------------------- + + None! + + + ------------------------------------------------------------------------------------------- + Returns + ------------------------------------------------------------------------------------------- + + Nothing! + """ + # TODO(kevin): support other initializations? - for fc in self.fcs: - torch.nn.init.xavier_uniform_(fc.weight) + for layer in self.layers: + torch.nn.init.xavier_uniform_(layer.weight) + torch.nn.init.zeros_(layer.bias) + + # All done! return + + +# ------------------------------------------------------------------------------------------------- +# Autoencoder class +# ------------------------------------------------------------------------------------------------- + class Autoencoder(torch.nn.Module): + def __init__(self, physics : Physics, config : dict) -> None: + """ + Initializes an Autoencoder object. An Autoencoder consists of two networks, an encoder, + E : \mathbb{R}^F -> \mathbb{R}^L, and a decoder, D : \mathbb{R}^L -> \marthbb{R}^F. We + assume that the dataset consists of samples of a parameterized L-manifold in + \mathbb{R}^F. The idea then is that E and D act like the inverse coordinate patch and + coordinate patch, respectively. In our case, E and D are trainable neural networks. We + try to train E and map data in \mathbb{R}^F to elements of a low dimensional latent + space (\mathbb{R}^L) which D can send back to the original data. (thus, E, and D should + act like inverses of one another). + + The Autoencoder class implements this model as a trainable torch.nn.Module object. + + + ------------------------------------------------------------------------------------------- + Arguments + ------------------------------------------------------------------------------------------- + + physics: A "Physics" object that holds the fom solution frames. We use this object to + determine the shape of each fom solution frame. Recall that each Physics object has a + corresponding PDE. We + + config: A dictionary representing the loaded .yml configuration file. We expect it to have + the following keys/: + hidden_units: A list of integers specifying the dimension of the co-domain of each + encoder layer except for the final one. Thus, if the k'th layer maps from + \mathbb{R}^{n(k)} to \mathbb{R}^{n(k + 1)} and there are K layers (indexed 0, 1, ... , + K - 1), then hidden_units should specify n(1), ... , n(K - 1). + + latent_dimension: The dimensionality of the Autoencoder's latent space. Equivalently, + the dimensionality of the co-domain of the encoder (i.e., the dimensionality of the + co-domain of the last layer of the encoder) and the domain of the decoder (i.e., the + dimensionality of the domain of the first layer of the decoder). + + + ------------------------------------------------------------------------------------------- + Returns + ------------------------------------------------------------------------------------------- + + Nothing! + """ - def __init__(self, physics, config): super(Autoencoder, self).__init__() - self.qgrid_size = physics.qgrid_size - self.space_dim = np.prod(self.qgrid_size) - hidden_units = config['hidden_units'] - n_z = config['latent_dimension'] - self.n_z = n_z - - layer_sizes = [self.space_dim] + hidden_units + [n_z] - #grab relevant initialization values from config - act_type = config['activation'] if 'activation' in config else 'sigmoid' - threshold = config["threshold"] if "threshold" in config else 0.1 - value = config["value"] if "value" in config else 0.0 - num_heads = config['num_heads'] if 'num_heads' in config else 1 - - self.encoder = MultiLayerPerceptron(layer_sizes, act_type, - reshape_index=0, reshape_shape=self.qgrid_size, - threshold=threshold, value=value, num_heads=num_heads) - - self.decoder = MultiLayerPerceptron(layer_sizes[::-1], act_type, - reshape_index=-1, reshape_shape=self.qgrid_size, - threshold=threshold, value=value, num_heads=num_heads) + # A Physics object's qgrid_size is a list of integers specifying the shape of each frame of + # the fom solution. If the solution is scalar valued, then this is just a list whose i'th + # element specifies the number of grid points along the i'th spatial axis. If the solution + # is vector valued, however, we prepend the dimensionality of the vector field to the list + # from the scalar list (so the 0 element represents the dimension of the vector field at + # each point). + self.qgrid_size : list[int] = physics.qgrid_size; + + # The product of the elements of qgrid_size is the number of dimensions in each fom + # solution frame. This number represents the dimensionality of the input to the encoder + # (since we pass a flattened fom frame as input). + self.space_dim : np.ndarray = np.prod(self.qgrid_size); + + # Fetch information about the domain/co-domain of each encoder layer. + hidden_units : list[int] = config['hidden_units']; + n_z : int = config['latent_dimension']; + self.n_z : int = n_z; + + # Build the "layer_sizes" argument for the MLP class. This consists of the dimensions of + # each layers' domain + the dimension of the co-domain of the final layer. + layer_sizes = [self.space_dim] + hidden_units + [n_z]; + # Use the settings to set up the activation information for the encoder. + act_type = config['activation'] if 'activation' in config else 'sigmoid' + threshold = config["threshold"] if "threshold" in config else 0.1 + value = config["value"] if "value" in config else 0.0 + + # Now, build the encoder. + self.encoder = MultiLayerPerceptron( + layer_sizes = layer_sizes, + act_type = act_type, + reshape_index = 0, # We need to flatten the spatial dimensions of each fom frame. + reshape_shape = self.qgrid_size, + threshold = threshold, + value = value); + + self.decoder = MultiLayerPerceptron( + layer_sizes = layer_sizes[::-1], # Reverses the order of the the list. + act_type = act_type, + reshape_index = -1, + reshape_shape = self.qgrid_size, # We need to reshape the network output to a fom frame. + threshold = threshold, + value = value) + + # All done! return - def forward(self, x): - x = self.encoder(x) - x = self.decoder(x) - return x + def forward(self, x : torch.Tensor) -> torch.Tensor: + """ + This function defines the forward pass through self. + + + ------------------------------------------------------------------------------------------- + Arguments + ------------------------------------------------------------------------------------------- + + x: A tensor holding a batch of inputs. We pass this tensor through the encoder + decoder + and then return the result. + + + ------------------------------------------------------------------------------------------- + Returns + ------------------------------------------------------------------------------------------- + + The image of x under the encoder + decoder. + """ + + # Encoder the input + z : torch.Tensor = self.encoder(x) + + # Now decode z. + y : torch.Tensor = self.decoder(z) + + # All done! Hopefully y \approx x. + return y - def export(self): - dict_ = {'autoencoder_param': self.cpu().state_dict()} + + + def export(self) -> dict: + """ + This function extracts self's parameters and returns them in a dictionary. + + + ------------------------------------------------------------------------------------------- + Arguments + ------------------------------------------------------------------------------------------- + + None! + + + ------------------------------------------------------------------------------------------- + Returns + ------------------------------------------------------------------------------------------- + + The A dictionary housing self's state dictionary. + """ + + # TO DO: deep export which includes all information needed to re-initialize self from + # scratch. This would probably require changing the initializer. + + dict_ = { 'autoencoder_param' : self.cpu().state_dict()} return dict_ - def load(self, dict_): + + + def load(self, dict_ : dict) -> None: + """ + This function loads self's state dictionary. + + + ------------------------------------------------------------------------------------------- + Arguments + ------------------------------------------------------------------------------------------- + + dict_: This should be a dictionary with the key "autoencoder_param" whose corresponding + value is the state dictionary of an autoencoder which has the same architecture (i.e., + layer sizes) as self. + + + ------------------------------------------------------------------------------------------- + Returns + ------------------------------------------------------------------------------------------- + + Nothing! + """ + self.load_state_dict(dict_['autoencoder_param']) return \ No newline at end of file diff --git a/src/lasdi/param.py b/src/lasdi/param.py index a8a2223..208a311 100644 --- a/src/lasdi/param.py +++ b/src/lasdi/param.py @@ -1,83 +1,310 @@ -import numpy as np -from .inputs import InputParser +# ------------------------------------------------------------------------------------------------- +# Imports +# ------------------------------------------------------------------------------------------------- -def get_1dspace_from_list(config): - Nx = len(config['list']) - paramRange = np.array(config['list']) +import numpy as np +from .inputs import InputParser + + + +# ------------------------------------------------------------------------------------------------- +# Helper functions +# ------------------------------------------------------------------------------------------------- + +def get_1dspace_from_list(param_dict : dict) -> tuple[int, np.ndarray]: + """ + This function generates the parameter range (set of possible parameter values) for a parameter + that uses the list type test space. That is, "test_space_type" should be a key for the + parameter dictionary and the corresponding value should be "list". The parameter dictionary + should also have a "list" key whose value is a list of the possible parameter values. + + We parse this list and turn it into a numpy ndarray. + + + ----------------------------------------------------------------------------------------------- + Arguments + ----------------------------------------------------------------------------------------------- + + param_dict: A dictionary specifying one of the parameters. We should fetch this from the + configuration yaml file. It must have a "list" key whose corresponding value is a list of + floats. + + + ----------------------------------------------------------------------------------------------- + Returns + ----------------------------------------------------------------------------------------------- + + Two arguments: Nx and paramRange. paramRange is a 1d numpy ndarray (whose ith value is the + i'th element of param_dict["list"]). Nx is the length of paramRange. + """ + + # In this case, the parameter dictionary should have a "list" attribute which should list the + # parameter values we want to test. Fetch it (it's length is Nx) and use it to generate an + # array of possible values. + Nx : int = len(param_dict['list']) + paramRange : np.ndarray = np.array(param_dict['list']) + + # All done! return Nx, paramRange -def create_uniform_1dspace(config): - Nx = config['sample_size'] - minval = config['min'] - maxval = config['max'] - if (config['log_scale']): - paramRange = np.exp(np.linspace(np.log(minval), np.log(maxval), Nx)) + + +def create_uniform_1dspace(param_dict : dict) -> tuple[int, np.ndarray]: + """ + This function generates the parameter range (set of possible parameter values) for a parameter + that uses the uniform type test space. That is, "test_space_type" should be a key for the + parameter dictionary and the corresponding value should be "uniform". The parameter dictionary + should also have the following keys: + "min" + "max" + "sample_size" + "log_scale" + "min" and "max" specify the minimum and maximum value of the parameter, respectively. + "sample_size" specifies the number of parameter values we generate. Finally, log_scale, if + true, specifies if we should use a uniform or logarithmic spacing between samples of the + parameter. + + The values corresponding to "min" and "max" should be floats while the values corresponding to + "sample_size" and "log_scale" should be an int and a bool, respectively. + + + ----------------------------------------------------------------------------------------------- + Arguments + ----------------------------------------------------------------------------------------------- + + param_dict: A dictionary specifying one of the parameters. We should fetch this from the + configuration yaml file. It must have a "min", "max", "sample_size", and "log_scale" + keys (see above). + + + ----------------------------------------------------------------------------------------------- + Returns + ----------------------------------------------------------------------------------------------- + + Two arguments: Nx and paramRange. paramRange is a 1d numpy ndarray (whose ith value is the + i'th possible value of the parameter. Thus, paramRange[0] = param_dict["min"] and + paramRange[-1] = param_dict["max"]). Nx is the length of paramRange or, equivalently + param_dict["sample_size"]. + """ + + # Fetch the number of samples and the min/max value for this parameter. + Nx : int = param_dict['sample_size'] + minval : float = param_dict['min'] + maxval : float = param_dict['max'] + + # Generate the range of parameter values. Note that we have to generate a uniform grid in the + # log space, then exponentiate it to generate logarithmic spacing. + if (param_dict['log_scale']): + paramRange : np.ndarray = np.exp(np.linspace(np.log(minval), np.log(maxval), Nx)) else: - paramRange = np.linspace(minval, maxval, Nx) + paramRange : np.ndarray = np.linspace(minval, maxval, Nx) + + # All done! return Nx, paramRange -getParam1DSpace = {'list': get_1dspace_from_list, - 'uniform': create_uniform_1dspace} + + +# A macro that allows us to switch function we use to generate generate a parameter's range. +getParam1DSpace : dict[str, callable] = {'list' : get_1dspace_from_list, + 'uniform' : create_uniform_1dspace} + + + +# ------------------------------------------------------------------------------------------------- +# ParameterSpace Class +# ------------------------------------------------------------------------------------------------- class ParameterSpace: - param_list = [] - param_name = [] - n_param = 0 - train_space = None - test_space = None - n_init = 0 - test_grid_sizes = [] - test_meshgrid = None - - def __init__(self, config): - assert('parameter_space' in config) - parser = InputParser(config['parameter_space'], name="param_space_input") - - self.param_list = parser.getInput(['parameters'], datatype=list) - self.n_param = len(self.param_list) - - self.param_name = [] + # Initialize class variables + param_list : list[dict] = [] # A list housing the parameter dictionaries (from the yml file) + param_name : list[str] = [] # A list housing the parameter names. + n_param : int = 0 # The number of parameters. + train_space : np.ndarray = None # A 2D array of shape (n_train, n_param) whose i,j element is the j'th parameter value in the i'th combination of training parameters. + test_space : np.ndarray = None # A 2D array of shape (n_test, n_param) whose i,j element is the j'th parameter value in the i'th combination of testing parameters. + n_init : int = 0 # The number of combinations of parameters in the training set??? + test_grid_sizes : list[int] = [] # A list whose i'th element is the number of different values of the i'th parameter in the test instances. + test_meshgrid : tuple[np.ndarray] = None + + + + def __init__(self, config : dict) -> None: + """ + Initializes a ParameterSpace object using the settings passed in the conf dictionary (which + should have been read from a yaml file). + + + + ------------------------------------------------------------------------------------------- + Arguments + ------------------------------------------------------------------------------------------- + + config: This is a dictionary that houses the settings we want to use to run the code. This + should have been read from a yaml file. We assume it contains the following keys. If one + or more keys are tabbed over relative to one key above them, then the one above is a + dictionary and the ones below should be keys within that dictionary. + - parameter_space + - parameters (this should have at least one parameter defined!) + - test_space + - type (should be "grid") + """ + + # Make sure the configuration dictionary has a "parameter_space" setting. This should house + # information about which variables are present in the code, as well as how we want to test + # the various possible parameter values. + assert('parameter_space' in config); + + # Load the parameter_space settings into an InputParser object (which makes it easy to + # fetch setting values). We then fetch the list of parameters. Each parameters has a name, + # min and max, and information on how many instances we want. + parser = InputParser(config['parameter_space'], name = "param_space_input") + self.param_list : list[dict] = parser.getInput(['parameters'], datatype = list) + + # Fetch the parameter names. + self.param_name : list[str] = [] for param in self.param_list: - self.param_name += [param['name']] + self.param_name += [param['name']]; - self.train_space = self.createInitialTrainSpace(self.param_list) - self.n_init = self.train_space.shape[0] + # First, let's fetch the set of possible parameter values. This yields a 2^k x k matrix, + # where k is the number of parameters. The i,j entry of this matrix gives the value of the + # j'th parameter on the i'th instance. + self.train_space = self.createInitialTrainSpace(self.param_list) + self.n_init = self.train_space.shape[0] - test_space_type = parser.getInput(['test_space', 'type'], datatype=str) + # Next, let's make a set of possible parameter values to test at. + test_space_type = parser.getInput(['test_space', 'type'], datatype = str) if (test_space_type == 'grid'): + # Generate the set possible parameter combinations. See the docstring for + # "createTestGridSpace" for details. self.test_grid_sizes, self.test_meshgrid, self.test_space = self.createTestGridSpace(self.param_list) + # All done! return - def n_train(self): + + + def n_train(self) -> int: + """ + Returns the number of combinations of parameters in the training set. + """ + return self.train_space.shape[0] - def n_test(self): + + + def n_test(self) -> int: + """ + Returns the number of combinations of parameters in the testing set. + """ + return self.test_space.shape[0] - - def createInitialTrainSpace(self, param_list): - paramRanges = [] - for param in param_list: - minval = param['min'] - maxval = param['max'] + + + def createInitialTrainSpace(self, param_list : list[dict]) -> np.ndarray: + """ + Sets up a grid of parameter values to train at. Note that we only use the min and max value + of each parameter when setting up this grid. + + + ------------------------------------------------------------------------------------------- + Arguments + ------------------------------------------------------------------------------------------- + + param_list: A list of parameter dictionaries. Each entry should be a dictionary with the + following keys: + - name + - min + - max + + + ------------------------------------------------------------------------------------------- + Returns + ------------------------------------------------------------------------------------------- + + A 2d array of shape ((2)^k, k), where k is the number of parameters (k == len(param_list)). + The i'th column is the flattened i'th mesh_grid array we when we create a mesh grid using + the min and max value of each parameter as the argument. See "createHyperMeshGrid" for + details. + + Specifically, we return exactly what "createHyperGridSpace" returns. See the doc-string + for that function for further details. + """ + + # We need to know the min and max value for each parameter to set up the grid of possible + # parameter values. + paramRanges : list[np.ndarray] = [] + + for param in param_list: + # Fetch the min, max value of the current parameter. + minval : float = param['min'] + maxval : float = param['max'] + + # Store these values in an array and add them to the list. paramRanges += [np.array([minval, maxval])] - mesh_grids = self.createHyperMeshGrid(paramRanges) + # Use the ranges to set up a grid of possible parameter values. + mesh_grids : tuple[np.ndarray] = self.createHyperMeshGrid(paramRanges) + + # Now combine these grids into a 2d return self.createHyperGridSpace(mesh_grids) - def createTestGridSpace(self, param_list): - paramRanges = [] - gridSizes = [] + + def createTestGridSpace(self, param_list : list[dict]) -> tuple[list[int], tuple[np.ndarray], np.ndarray]: + """ + This function sets up a grid of parameter values to test at. + + + ------------------------------------------------------------------------------------------- + Arguments + ------------------------------------------------------------------------------------------- + + param_list: A list of parameter dictionaries. Each dictionary should either use the + "uniform" or "list" format. See create_uniform_1dspace and get_1dspace_from_list, + respectively. + + + ------------------------------------------------------------------------------------------- + Returns + ------------------------------------------------------------------------------------------- + + A three element tuple. + + The first is a list whose i'th element specifies the number of distinct values of the i'th + parameter we consider (this is the length of the i'th element of "paramRanges" below). + + The second is a a tuple of k numpy ndarrays (where k = len(param_list)), the i'th one of + which is a k-dimensional array with shape (N0, ... , N{k - 1}), where Ni = + param_list[i].size whose i(0), ... , i(k - 1) element specifies the value of the i'th + parameter in the i(0), ... , i(k - 1)'th unique combination of parameter values. + + The third one is a 2d array of parameter values. It has shape (M, k), where + M = \prod_{i = 0}^{k - 1} param_list[i].size. + """ + + # Set up arrays to hold the parameter values + number of parameter values for each + # parameter. + paramRanges : np.ndarray = [] + gridSizes : list[int] = [] + + # Cycle through the parameters for param in param_list: - Nx, paramRange = getParam1DSpace[param['test_space_type']](param) - gridSizes += [Nx] - paramRanges += [paramRange] + # Fetch the set of possible parameter values (paramRange) + the size of this set (Nx) + Nx, paramRange = getParam1DSpace[param['test_space_type']](param) + + # Add Nx, ParamRange to their corresponding lists + gridSizes += [Nx] + paramRanges += [paramRange] - mesh_grids = self.createHyperMeshGrid(paramRanges) + # Now that we have the set of parameter values for each parameter, turn it into a grid. + mesh_grids : tuple[np.ndarray] = self.createHyperMeshGrid(paramRanges) + + # All done. Return a list specifying the number of possible values for each parameter, the + # grids of parameter values, and the flattened/2d version of it. return gridSizes, mesh_grids, self.createHyperGridSpace(mesh_grids) + + def getParameter(self, param_vector): ''' convert numpy array parameter vector to a dict. @@ -91,63 +318,210 @@ def getParameter(self, param_vector): return param - def createHyperMeshGrid(self, param_ranges): + + + def createHyperMeshGrid(self, param_ranges : list[np.ndarray]) -> tuple[np.ndarray]: ''' - param_ranges: list of numpy 1d arrays, each corresponding to 1d parameter grid space. - The list size is equal to the number of parameters. - - Output: paramSpaces - - tuple of numpy nd arrays, corresponding to each parameter. - Dimension of the array equals to the number of parameters + This function generates arrays of parameter values. Specifically, if there are k + parameters (param_ranges has k elements), then we return k k-d arrays, the i'th one of + which is a k-dimensional array whose i(0), ... , i(k - 1) element specifies the value of + the i'th variable in the i(0), ... , i(k - 1)'th unique combination of parameter values. + + + ------------------------------------------------------------------------------------------- + Arguments + ------------------------------------------------------------------------------------------- + + param_ranges: list of numpy 1d arrays, each corresponding to 1d parameter grid space. The + i'th element of this list should be a 2-element numpy.ndarray object housing the max and + min value for the i'th parameter. The list size should equal the number of parameters. + + + ------------------------------------------------------------------------------------------- + Returns + ------------------------------------------------------------------------------------------- + + the "paramSpaces" tuple. This is a tuple of numpy ndarray objects, the i'th one of which + gives the grid of parameter values for the i'th parameter. Specifically, if there are + k parameters and if param_range[i].size = Ni, then the j'th return array has shape + (N0, ... , N{k - 1}) and the i(0), ... , i(k - 1) element of this array houses the i(j)'th + value of the j'th parameter. + + Thus, if there are k parameters, the returned tuple has k elements, each one of + which is an array of shape (N0, ... , N{k - 1}). ''' - args = () + + # Fetch the ranges, add them to a tuple (this is what the meshgrid function needs). + args : tuple[np.ndarray] = () for paramRange in param_ranges: args += (paramRange,) - paramSpaces = np.meshgrid(*args, indexing='ij') + # Use numpy's meshgrid function to generate the grids of parameter values. + paramSpaces : tuple[np.ndarray] = np.meshgrid(*args, indexing='ij') + + # All done! return paramSpaces - def createHyperGridSpace(self, mesh_grids): + + + def createHyperGridSpace(self, mesh_grids : tuple[np.ndarray]) -> np.ndarray: ''' - mesh_grids: tuple of numpy nd arrays, corresponding to each parameter. - Dimension of the array equals to the number of parameters - - Output: param_grid - - numpy 2d array of size (grid size x number of parameters). + Flattens the mesh_grid numpy.ndarray objects returned by createHyperMeshGrid and combines + them into a single 2d array of shape (grid size, number of parameters) (see below). + + + ------------------------------------------------------------------------------------------- + Arguments + ------------------------------------------------------------------------------------------- - grid size is the size of a numpy nd array. + mesh_grids: tuple of numpy nd arrays, corresponding to each parameter. This should ALWAYS + be the output of the "CreateHyperMeshGrid" function. See the return section of that + function's docstring for details. + + + ------------------------------------------------------------------------------------------- + Returns + ------------------------------------------------------------------------------------------- + + The param_grid. This is a 2d numpy.ndarray object of shape (grid size, number of + parameters). If each element of mesh_grids is a numpy.ndarray object of shape (N(1), ... , + N(k)) (k parameters), then (grid size) = N(1)*N(2)*...*N(k) and (number of parameters) = k. ''' - param_grid = None + + # For each parameter, we flatten its mesh_grid into a 1d array (of length (grid size)). We + # horizontally stack these flattened grids to get the final param_grid array. + param_grid : np.ndarray = None for k, paramSpace in enumerate(mesh_grids): + # Special treatment for the first parameter to initialize param_grid if (k == 0): - param_grid = paramSpace.reshape(-1, 1) + param_grid : np.ndarray = paramSpace.reshape(-1, 1) continue - param_grid = np.hstack((param_grid, paramSpace.reshape(-1, 1))) + # Flatten the new mesh grid and append it to the grid. + param_grid : np.ndarray = np.hstack((param_grid, paramSpace.reshape(-1, 1))) + # All done! return param_grid - def appendTrainSpace(self, param): + + + def appendTrainSpace(self, param : np.ndarray) -> None: + """ + Adds a new parameter to self's train space attribute. + + + ------------------------------------------------------------------------------------------- + Arguments + ------------------------------------------------------------------------------------------- + + param: A 1d numpy ndarray object. It should have shape (n_param) and should hold a + parameter value that we want to add to the training set. + + + + ------------------------------------------------------------------------------------------- + Returns + ------------------------------------------------------------------------------------------- + + Nothing! + """ + + # Make sure param has n_param components/can be appended to the set of training parameters. assert(self.train_space.shape[1] == param.size) - self.train_space = np.vstack((self.train_space, param)) + # Add the new parameter to the training space by appending it as a new row to + # self.train_space + self.train_space : np.ndarray = np.vstack((self.train_space, param)) + + # All done! return - def export(self): - dict_ = {'train_space': self.train_space, - 'test_space': self.test_space, - 'test_grid_sizes': self.test_grid_sizes, - 'test_meshgrid': self.test_meshgrid, - 'n_init': self.n_init} + + + def export(self) -> dict: + """ + This function packages the testing/training examples into a dictionary, which it returns. + + + ------------------------------------------------------------------------------------------- + Arguments + ------------------------------------------------------------------------------------------- + + None! + + ------------------------------------------------------------------------------------------- + Returns + ------------------------------------------------------------------------------------------- + + A dictionary with 4 keys. Below is a list of the keys with a short description of each + corresponding value. + train_space: self.train_space, a 2d array of shape (n_train, n_param) whose i,j element + holds the value of the j'th parameter in the i'th training case. + + test_space: self.test_space, a 2d array of shape (n_test, n_param) whose i,j element + holds the value of the j'th parameter in the i'th testing case. + + test_grid_sizes: A list whose i'th element specifies how many distinct parameter values + we use for the i'th parameter. + + test_meshgrid: a tuple of n_param numpy.ndarray array objects whose i'th element is a + n_param-dimensional array whose i(1), i(2), ... , i(n_param) element holds the value of + the i'th parameter in the i(1), ... , i(n_param) combination of parameter values in the + testing test. + + n_init: The number of combinations of training parameters in the training set. + """ + + # Build the dictionary + dict_ = {'train_space' : self.train_space, + 'test_space' : self.test_space, + 'test_grid_sizes' : self.test_grid_sizes, + 'test_meshgrid' : self.test_meshgrid, + 'n_init' : self.n_init} + + # All done! return dict_ - def load(self, dict_): - self.train_space = dict_['train_space'] - self.test_space = dict_['test_space'] - self.test_grid_sizes = dict_['test_grid_sizes'] - self.test_meshgrid = dict_['test_meshgrid'] - - assert(self.n_init == dict_['n_init']) - assert(self.train_space.shape[1] == self.n_param) - assert(self.test_space.shape[1] == self.n_param) + + + def load(self, dict_ : dict) -> None: + """ + This function builds a parameter space object from a dictionary. This dictionary should + be one that was returned by th export method, or a loaded copy of a dictionary that was + returned by the export method. + + + ------------------------------------------------------------------------------------------- + Arguments + ------------------------------------------------------------------------------------------- + + dict_: This should be a dictionary with the following keys: + - train_space + - test_space + - test_grid_sizes + - test_meshgrid + - n_init + This dictionary should have been returned by the export method. We use the values in this + dictionary to set up self. + + + ------------------------------------------------------------------------------------------- + Returns + ------------------------------------------------------------------------------------------- + + Nothing! + """ + + # Extract information from the dictionary. + self.train_space : np.ndarray = dict_['train_space'] + self.test_space : np.ndarray = dict_['test_space'] + self.test_grid_sizes : list[int] = dict_['test_grid_sizes'] + self.test_meshgrid : tuple[np.ndarray] = dict_['test_meshgrid'] + + # Run checks + assert(self.n_init == dict_['n_init']) + assert(self.train_space.shape[1] == self.n_param) + assert(self.test_space.shape[1] == self.n_param) + + # All done! return