diff --git a/neuralnetlib/callbacks.py b/neuralnetlib/callbacks.py index ee3d910..73ce057 100644 --- a/neuralnetlib/callbacks.py +++ b/neuralnetlib/callbacks.py @@ -1,5 +1,6 @@ import numpy as np + class EarlyStopping: def __init__(self, patience: int = 5, min_delta: float = 0.001, restore_best_weights: bool = True, start_from_epoch: int = 0, monitor: list = None, mode: str = 'auto', baseline: float = None): @@ -40,13 +41,13 @@ def on_epoch_end(self, model, loss, metrics=None): if self.monitor is None: current_metric = loss if (self.mode == 'min' and current_metric < self.best_metric - self.min_delta) or \ - (self.mode == 'max' and current_metric > self.best_metric + self.min_delta): + (self.mode == 'max' and current_metric > self.best_metric + self.min_delta): self.best_metric = current_metric improved = True else: current_metric = metrics[self.monitor[0].__name__] if (self.mode == 'max' and current_metric > self.best_metric + self.min_delta) or \ - (self.mode == 'min' and current_metric < self.best_metric - self.min_delta): + (self.mode == 'min' and current_metric < self.best_metric - self.min_delta): self.best_metric = current_metric improved = True @@ -66,8 +67,9 @@ def on_epoch_end(self, model, loss, metrics=None): self.stop_training = True print(f"\nEarly stopping after {self.epoch} epochs.", end='') if self.restore_best_weights and self.best_weights is not None: - for layer, best_weights in zip([layer for layer in model.layers if hasattr(layer, 'weights')], self.best_weights): + for layer, best_weights in zip([layer for layer in model.layers if hasattr(layer, 'weights')], + self.best_weights): layer.weights = best_weights return True - return False \ No newline at end of file + return False diff --git a/neuralnetlib/layers.py b/neuralnetlib/layers.py index 1eca562..86f7602 100644 --- a/neuralnetlib/layers.py +++ b/neuralnetlib/layers.py @@ -81,7 +81,8 @@ def from_config(config: dict): class Dense(Layer): - def __init__(self, units: int, weights_init: str = "default", bias_init: str = "default", random_state: int = None, **kwargs): + def __init__(self, units: int, weights_init: str = "default", bias_init: str = "default", random_state: int = None, + **kwargs): self.units = units self.weights = None @@ -466,7 +467,7 @@ def _pool(input_data: np.ndarray, pool_size: tuple, stride: tuple, padding: str) for i in range(out_height): for j in range(out_width): input_slice = padded_input[:, :, i * stride[0]:i * stride[0] + pool_size[0], - j * stride[1]:j * stride[1] + pool_size[1]] + j * stride[1]:j * stride[1] + pool_size[1]] output[:, :, i, j] = np.max(input_slice, axis=(2, 3)) return output @@ -493,16 +494,16 @@ def _pool_backward(output_error: np.ndarray, input_data: np.ndarray, pool_size: for i in range(out_height): for j in range(out_width): input_slice = padded_input[:, :, i * stride[0]:i * stride[0] + pool_size[0], - j * stride[1]:j * stride[1] + pool_size[1]] + j * stride[1]:j * stride[1] + pool_size[1]] mask = (input_slice == np.max( input_slice, axis=(2, 3), keepdims=True)) d_input[:, :, i * stride[0]:i * stride[0] + pool_size[0], - j * stride[1]:j * stride[1] + pool_size[1]] += output_error[:, :, i, j][:, :, np.newaxis, - np.newaxis] * mask + j * stride[1]:j * stride[1] + pool_size[1]] += output_error[:, :, i, j][:, :, np.newaxis, + np.newaxis] * mask if padding == 'same': d_input = d_input[:, :, pad_height:- - pad_height, pad_width:-pad_width] + pad_height, pad_width:-pad_width] return d_input @@ -754,7 +755,7 @@ def _pool_backward(output_error: np.ndarray, input_data: np.ndarray, pool_size: input_slice = padded_input[:, :, i * stride:i * stride + pool_size] mask = (input_slice == np.max(input_slice, axis=2, keepdims=True)) d_input[:, :, i * stride:i * stride + - pool_size] += output_error[:, :, i][:, :, np.newaxis] * mask + pool_size] += output_error[:, :, i][:, :, np.newaxis] * mask if padding == 'same': d_input = d_input[:, :, pad_length:-pad_length] @@ -864,16 +865,16 @@ def forward_pass(self, input_data: np.ndarray, training: bool = True) -> np.ndar mean = np.mean(input_data, axis=0) var = np.var(input_data, axis=0) self.running_mean = self.momentum * \ - self.running_mean + (1 - self.momentum) * mean + self.running_mean + (1 - self.momentum) * mean self.running_var = self.momentum * \ - self.running_var + (1 - self.momentum) * var + self.running_var + (1 - self.momentum) * var else: mean = self.running_mean var = self.running_var self.input_centered = input_data - mean self.input_normalized = self.input_centered / \ - np.sqrt(var + self.epsilon) + np.sqrt(var + self.epsilon) return self.gamma * self.input_normalized + self.beta def backward_pass(self, output_error: np.ndarray) -> np.ndarray: @@ -883,7 +884,7 @@ def backward_pass(self, output_error: np.ndarray) -> np.ndarray: d_input_normalized = output_error * self.gamma d_var = np.sum(d_input_normalized * self.input_centered, axis=0) * -0.5 * ( - self.input_centered / (self.input_centered.var(axis=0) + self.epsilon) ** 1.5) + self.input_centered / (self.input_centered.var(axis=0) + self.epsilon) ** 1.5) d_mean = np.sum(d_input_normalized, axis=0) * -1 / np.sqrt( self.input_centered.var(axis=0) + self.epsilon) - 2 * d_var * np.mean(self.input_centered, axis=0) d_input = d_input_normalized / np.sqrt( @@ -969,7 +970,7 @@ def _pool(input_data: np.ndarray, pool_size: tuple, stride: tuple, padding: str) for i in range(out_height): for j in range(out_width): input_slice = padded_input[:, :, i * stride[0]:i * stride[0] + pool_size[0], - j * stride[1]:j * stride[1] + pool_size[1]] + j * stride[1]:j * stride[1] + pool_size[1]] output[:, :, i, j] = np.mean(input_slice, axis=(2, 3)) return output @@ -996,12 +997,12 @@ def _pool_backward(output_error: np.ndarray, input_data: np.ndarray, pool_size: for i in range(out_height): for j in range(out_width): d_input[:, :, i * stride[0]:i * stride[0] + pool_size[0], - j * stride[1]:j * stride[1] + pool_size[1]] += output_error[:, :, i, j][:, :, np.newaxis, - np.newaxis] / np.prod(pool_size) + j * stride[1]:j * stride[1] + pool_size[1]] += output_error[:, :, i, j][:, :, np.newaxis, + np.newaxis] / np.prod(pool_size) if padding == 'same': d_input = d_input[:, :, pad_height:- - pad_height, pad_width:-pad_width] + pad_height, pad_width:-pad_width] return d_input @@ -1080,7 +1081,7 @@ def _pool_backward(output_error: np.ndarray, input_data: np.ndarray, pool_size: for i in range(out_steps): d_input[:, i * stride:i * stride + pool_size, - :] += output_error[:, i, :][:, np.newaxis, :] / pool_size + :] += output_error[:, i, :][:, np.newaxis, :] / pool_size if padding == 'same': d_input = d_input[:, pad_steps:-pad_steps, :] @@ -1109,12 +1110,13 @@ def backward_pass(self, output_error: np.ndarray) -> np.ndarray: def get_config(self) -> dict: config = {'name': self.__class__.__name__, 'dims': self.dims} config.update({key: getattr(self, key) - for key in self.__dict__ if key not in ['dims']}) + for key in self.__dict__ if key not in ['dims']}) return config @staticmethod def from_config(config: dict): - return Permute(config['dims'], **{key: value for key, value in config.items() if key != 'name' and key != 'dims'}) + return Permute(config['dims'], + **{key: value for key, value in config.items() if key != 'name' and key != 'dims'}) # -------------------------------------------------------------------------------------------------------------- @@ -1123,7 +1125,8 @@ def from_config(config: dict): compatibility_dict = { Input: [Dense, Conv2D, Conv1D, Embedding, Permute], Dense: [Dense, Activation, Dropout, BatchNormalization, Permute], - Activation: [Dense, Conv2D, Conv1D, MaxPooling2D, AveragePooling2D, MaxPooling1D, AveragePooling1D, Flatten, Dropout, Permute], + Activation: [Dense, Conv2D, Conv1D, MaxPooling2D, AveragePooling2D, MaxPooling1D, AveragePooling1D, Flatten, + Dropout, Permute], Conv2D: [Conv2D, MaxPooling2D, AveragePooling2D, Activation, Dropout, Flatten, BatchNormalization, Permute], MaxPooling2D: [Conv2D, MaxPooling2D, AveragePooling2D, Flatten, Permute], AveragePooling2D: [Conv2D, MaxPooling2D, AveragePooling2D, Flatten, Permute], diff --git a/neuralnetlib/losses.py b/neuralnetlib/losses.py index c382576..a596aee 100644 --- a/neuralnetlib/losses.py +++ b/neuralnetlib/losses.py @@ -27,7 +27,7 @@ def from_config(config: dict) -> 'LossFunction': return HuberLoss(config['delta']) else: raise ValueError(f'Unknown loss function: {config["name"]}') - + @staticmethod def from_name(name: str) -> "LossFunction": name = name.lower().replace("_", "") @@ -46,7 +46,7 @@ def from_name(name: str) -> "LossFunction": for subclass in LossFunction.__subclasses__(): if subclass.__name__.lower() == name: return subclass() - + raise ValueError(f"No loss function found for the name: {name}") @@ -85,7 +85,7 @@ def derivative(self, y_true: np.ndarray, y_pred: np.ndarray) -> np.ndarray: return -y_true / y_pred except Exception as e: print(e, "Make sure to one-hot encode your labels.", sep="\n") - + def __str__(self): return "CategoricalCrossentropy" diff --git a/neuralnetlib/model.py b/neuralnetlib/model.py index 3b65951..2703f67 100644 --- a/neuralnetlib/model.py +++ b/neuralnetlib/model.py @@ -1,17 +1,17 @@ import json +import os import time -import matplotlib +import matplotlib +import matplotlib.pyplot as plt import numpy as np -import os from neuralnetlib.activations import ActivationFunction from neuralnetlib.layers import Layer, Input, Activation, Dropout, compatibility_dict from neuralnetlib.losses import LossFunction, CategoricalCrossentropy -from neuralnetlib.preprocessing import PCA from neuralnetlib.optimizers import Optimizer +from neuralnetlib.preprocessing import PCA from neuralnetlib.utils import shuffle, progress_bar, is_interactive -import matplotlib.pyplot as plt class Model: @@ -324,4 +324,4 @@ def __update_plot(self, epoch, x_train, y_train, random_state): ax.set_title(f"Decision Boundary (Epoch {epoch + 1})") fig.canvas.draw() - plt.pause(0.1) \ No newline at end of file + plt.pause(0.1) diff --git a/neuralnetlib/optimizers.py b/neuralnetlib/optimizers.py index 62c1e06..5a937c2 100644 --- a/neuralnetlib/optimizers.py +++ b/neuralnetlib/optimizers.py @@ -26,7 +26,7 @@ def from_config(config: dict): return Adam.from_config(config) else: raise ValueError(f"Unknown optimizer name: {config['name']}") - + @staticmethod def from_name(name: str) -> "Optimizer": name = name.lower().replace("_", "") @@ -34,7 +34,7 @@ def from_name(name: str) -> "Optimizer": for subclass in Optimizer.__subclasses__(): if subclass.__name__.lower() == name: return subclass() - + raise ValueError(f"No optimizer found for the name: {name}") diff --git a/neuralnetlib/utils.py b/neuralnetlib/utils.py index 37763d5..e8e2a33 100644 --- a/neuralnetlib/utils.py +++ b/neuralnetlib/utils.py @@ -77,4 +77,4 @@ def is_interactive(): import __main__ as main return not hasattr(main, '__file__') except: - return False \ No newline at end of file + return False