From 45a51ada53538a5dd1ece9f64bbcbfc83e15c6c8 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E2=80=9CShashank?= Date: Tue, 15 Oct 2024 21:24:46 +0530 Subject: [PATCH] descriptive names + improved doctests --- neural_network/lstm.py | 421 +++++++++++++++++++++++------------------ 1 file changed, 238 insertions(+), 183 deletions(-) diff --git a/neural_network/lstm.py b/neural_network/lstm.py index 726786633..3e4857786 100644 --- a/neural_network/lstm.py +++ b/neural_network/lstm.py @@ -1,70 +1,71 @@ -""" -Name - - LSTM - Long Short-Term Memory Network For Sequence Prediction -Goal - - Predict sequences of data -Detail: Total 3 layers neural network -* Input layer -* LSTM layer -* Output layer -Author: Shashank Tyagi -Github: LEVII007 -Date: [Current Date] -""" - -# from typing import dict, list - import numpy as np from numpy.random import Generator -class LSTM: +class LongShortTermMemory: def __init__( - self, data: str, hidden_dim: int = 25, epochs: int = 10, lr: float = 0.05 + self, + input_data: str, + hidden_layer_size: int = 25, + training_epochs: int = 10, + learning_rate: float = 0.05, ) -> None: """ Initialize the LSTM network with the given data and hyperparameters. - :param data: The input data as a string. - :param hidden_dim: The number of hidden units in the LSTM layer. - :param epochs: The number of training epochs. - :param lr: The learning rate. - """ - """ - Test the LSTM model. + :param input_data: The input data as a string. + :param hidden_layer_size: The number of hidden units in the LSTM layer. + :param training_epochs: The number of training epochs. + :param learning_rate: The learning rate. - >>> lstm = LSTM(data="abcde" * 50, hidden_dim=10, epochs=5, lr=0.01) - >>> lstm.train() - >>> predictions = lstm.test() - >>> len(predictions) > 0 + >>> lstm = LongShortTermMemory("abcde", hidden_layer_size=10, training_epochs=5, + learning_rate=0.01) + >>> isinstance(lstm, LongShortTermMemory) True + >>> lstm.hidden_layer_size + 10 + >>> lstm.training_epochs + 5 + >>> lstm.learning_rate + 0.01 + >>> len(lstm.input_sequence) + 4 """ - self.data: str = data.lower() - self.hidden_dim: int = hidden_dim - self.epochs: int = epochs - self.lr: float = lr + self.input_data: str = input_data.lower() + self.hidden_layer_size: int = hidden_layer_size + self.training_epochs: int = training_epochs + self.learning_rate: float = learning_rate - self.chars: set = set(self.data) - self.data_size: int = len(self.data) - self.char_size: int = len(self.chars) + self.unique_chars: set = set(self.input_data) + self.data_length: int = len(self.input_data) + self.vocabulary_size: int = len(self.unique_chars) - print(f"Data size: {self.data_size}, Char Size: {self.char_size}") + print( + f"Data length: {self.data_length}, Vocabulary size: {self.vocabulary_size}" + ) - self.char_to_idx: dict[str, int] = {c: i for i, c in enumerate(self.chars)} - self.idx_to_char: dict[int, str] = dict(enumerate(self.chars)) + self.char_to_index: dict[str, int] = { + c: i for i, c in enumerate(self.unique_chars) + } + self.index_to_char: dict[int, str] = dict(enumerate(self.unique_chars)) - self.train_X: str = self.data[:-1] - self.train_y: str = self.data[1:] - self.rng: Generator = np.random.default_rng() + self.input_sequence: str = self.input_data[:-1] + self.target_sequence: str = self.input_data[1:] + self.random_generator: Generator = np.random.default_rng() # Initialize attributes used in reset method - self.concat_inputs: dict[int, np.ndarray] = {} - self.hidden_states: dict[int, np.ndarray] = {-1: np.zeros((self.hidden_dim, 1))} - self.cell_states: dict[int, np.ndarray] = {-1: np.zeros((self.hidden_dim, 1))} - self.activation_outputs: dict[int, np.ndarray] = {} - self.candidate_gates: dict[int, np.ndarray] = {} - self.output_gates: dict[int, np.ndarray] = {} - self.forget_gates: dict[int, np.ndarray] = {} - self.input_gates: dict[int, np.ndarray] = {} - self.outputs: dict[int, np.ndarray] = {} + self.combined_inputs: dict[int, np.ndarray] = {} + self.hidden_states: dict[int, np.ndarray] = { + -1: np.zeros((self.hidden_layer_size, 1)) + } + self.cell_states: dict[int, np.ndarray] = { + -1: np.zeros((self.hidden_layer_size, 1)) + } + self.forget_gate_activations: dict[int, np.ndarray] = {} + self.input_gate_activations: dict[int, np.ndarray] = {} + self.cell_state_candidates: dict[int, np.ndarray] = {} + self.output_gate_activations: dict[int, np.ndarray] = {} + self.network_outputs: dict[int, np.ndarray] = {} self.initialize_weights() @@ -75,8 +76,8 @@ class LSTM: :param char: The character to encode. :return: A one-hot encoded vector. """ - vector = np.zeros((self.char_size, 1)) - vector[self.char_to_idx[char]] = 1 + vector = np.zeros((self.vocabulary_size, 1)) + vector[self.char_to_index[char]] = 1 return vector def initialize_weights(self) -> None: @@ -84,20 +85,30 @@ class LSTM: Initialize the weights and biases for the LSTM network. """ - self.wf = self.init_weights(self.char_size + self.hidden_dim, self.hidden_dim) - self.bf = np.zeros((self.hidden_dim, 1)) + self.forget_gate_weights = self.init_weights( + self.vocabulary_size + self.hidden_layer_size, self.hidden_layer_size + ) + self.forget_gate_bias = np.zeros((self.hidden_layer_size, 1)) - self.wi = self.init_weights(self.char_size + self.hidden_dim, self.hidden_dim) - self.bi = np.zeros((self.hidden_dim, 1)) + self.input_gate_weights = self.init_weights( + self.vocabulary_size + self.hidden_layer_size, self.hidden_layer_size + ) + self.input_gate_bias = np.zeros((self.hidden_layer_size, 1)) - self.wc = self.init_weights(self.char_size + self.hidden_dim, self.hidden_dim) - self.bc = np.zeros((self.hidden_dim, 1)) + self.cell_candidate_weights = self.init_weights( + self.vocabulary_size + self.hidden_layer_size, self.hidden_layer_size + ) + self.cell_candidate_bias = np.zeros((self.hidden_layer_size, 1)) - self.wo = self.init_weights(self.char_size + self.hidden_dim, self.hidden_dim) - self.bo = np.zeros((self.hidden_dim, 1)) + self.output_gate_weights = self.init_weights( + self.vocabulary_size + self.hidden_layer_size, self.hidden_layer_size + ) + self.output_gate_bias = np.zeros((self.hidden_layer_size, 1)) - self.wy: np.ndarray = self.init_weights(self.hidden_dim, self.char_size) - self.by: np.ndarray = np.zeros((self.char_size, 1)) + self.output_layer_weights: np.ndarray = self.init_weights( + self.hidden_layer_size, self.vocabulary_size + ) + self.output_layer_bias: np.ndarray = np.zeros((self.vocabulary_size, 1)) def init_weights(self, input_dim: int, output_dim: int) -> np.ndarray: """ @@ -107,7 +118,7 @@ class LSTM: :param output_dim: The output dimension. :return: A matrix of initialized weights. """ - return self.rng.uniform(-1, 1, (output_dim, input_dim)) * np.sqrt( + return self.random_generator.uniform(-1, 1, (output_dim, input_dim)) * np.sqrt( 6 / (input_dim + output_dim) ) @@ -145,21 +156,20 @@ class LSTM: exp_x = np.exp(x - np.max(x)) return exp_x / exp_x.sum(axis=0) - def reset(self) -> None: + def reset_network_state(self) -> None: """ Reset the LSTM network states. """ - self.concat_inputs = {} - self.hidden_states = {-1: np.zeros((self.hidden_dim, 1))} - self.cell_states = {-1: np.zeros((self.hidden_dim, 1))} - self.activation_outputs = {} - self.candidate_gates = {} - self.output_gates = {} - self.forget_gates = {} - self.input_gates = {} - self.outputs = {} + self.combined_inputs = {} + self.hidden_states = {-1: np.zeros((self.hidden_layer_size, 1))} + self.cell_states = {-1: np.zeros((self.hidden_layer_size, 1))} + self.forget_gate_activations = {} + self.input_gate_activations = {} + self.cell_state_candidates = {} + self.output_gate_activations = {} + self.network_outputs = {} - def forward(self, inputs: list[np.ndarray]) -> list[np.ndarray]: + def forward_pass(self, inputs: list[np.ndarray]) -> list[np.ndarray]: """ Perform forward propagation through the LSTM network. @@ -169,208 +179,253 @@ class LSTM: """ Forward pass through the LSTM network. - >>> lstm = LSTM(data="abcde", hidden_dim=10, epochs=1, lr=0.01) - >>> inputs = [lstm.one_hot_encode(char) for char in lstm.train_X] - >>> outputs = lstm.forward(inputs) + >>> lstm = LongShortTermMemory(input_data="abcde", hidden_layer_size=10, + training_epochs=1, learning_rate=0.01) + >>> inputs = [lstm.one_hot_encode(char) for char in lstm.input_sequence] + >>> outputs = lstm.forward_pass(inputs) >>> len(outputs) == len(inputs) True """ - self.reset() + self.reset_network_state() outputs = [] for t in range(len(inputs)): - self.concat_inputs[t] = np.concatenate( + self.combined_inputs[t] = np.concatenate( (self.hidden_states[t - 1], inputs[t]) ) - self.forget_gates[t] = self.sigmoid( - np.dot(self.wf, self.concat_inputs[t]) + self.bf + self.forget_gate_activations[t] = self.sigmoid( + np.dot(self.forget_gate_weights, self.combined_inputs[t]) + + self.forget_gate_bias ) - self.input_gates[t] = self.sigmoid( - np.dot(self.wi, self.concat_inputs[t]) + self.bi + self.input_gate_activations[t] = self.sigmoid( + np.dot(self.input_gate_weights, self.combined_inputs[t]) + + self.input_gate_bias ) - self.candidate_gates[t] = self.tanh( - np.dot(self.wc, self.concat_inputs[t]) + self.bc + self.cell_state_candidates[t] = self.tanh( + np.dot(self.cell_candidate_weights, self.combined_inputs[t]) + + self.cell_candidate_bias ) - self.output_gates[t] = self.sigmoid( - np.dot(self.wo, self.concat_inputs[t]) + self.bo + self.output_gate_activations[t] = self.sigmoid( + np.dot(self.output_gate_weights, self.combined_inputs[t]) + + self.output_gate_bias ) self.cell_states[t] = ( - self.forget_gates[t] * self.cell_states[t - 1] - + self.input_gates[t] * self.candidate_gates[t] + self.forget_gate_activations[t] * self.cell_states[t - 1] + + self.input_gate_activations[t] * self.cell_state_candidates[t] ) - self.hidden_states[t] = self.output_gates[t] * self.tanh( + self.hidden_states[t] = self.output_gate_activations[t] * self.tanh( self.cell_states[t] ) - outputs.append(np.dot(self.wy, self.hidden_states[t]) + self.by) + outputs.append( + np.dot(self.output_layer_weights, self.hidden_states[t]) + + self.output_layer_bias + ) return outputs - def backward(self, errors: list[np.ndarray], inputs: list[np.ndarray]) -> None: + def backward_pass(self, errors: list[np.ndarray], inputs: list[np.ndarray]) -> None: """ Perform backpropagation through time to compute gradients and update weights. :param errors: The errors at each time step. :param inputs: The input data as a list of one-hot encoded vectors. """ - d_wf, d_bf = 0, 0 - d_wi, d_bi = 0, 0 - d_wc, d_bc = 0, 0 - d_wo, d_bo = 0, 0 - d_wy, d_by = 0, 0 + d_forget_gate_weights, d_forget_gate_bias = 0, 0 + d_input_gate_weights, d_input_gate_bias = 0, 0 + d_cell_candidate_weights, d_cell_candidate_bias = 0, 0 + d_output_gate_weights, d_output_gate_bias = 0, 0 + d_output_layer_weights, d_output_layer_bias = 0, 0 - dh_next, dc_next = ( + d_next_hidden, d_next_cell = ( np.zeros_like(self.hidden_states[0]), np.zeros_like(self.cell_states[0]), ) + for t in reversed(range(len(inputs))): error = errors[t] - d_wy += np.dot(error, self.hidden_states[t].T) - d_by += error + d_output_layer_weights += np.dot(error, self.hidden_states[t].T) + d_output_layer_bias += error - d_hs = np.dot(self.wy.T, error) + dh_next + d_hidden = np.dot(self.output_layer_weights.T, error) + d_next_hidden - d_o = ( + d_output_gate = ( self.tanh(self.cell_states[t]) - * d_hs - * self.sigmoid(self.output_gates[t], derivative=True) + * d_hidden + * self.sigmoid(self.output_gate_activations[t], derivative=True) ) - d_wo += np.dot(d_o, self.concat_inputs[t].T) - d_bo += d_o + d_output_gate_weights += np.dot(d_output_gate, self.combined_inputs[t].T) + d_output_gate_bias += d_output_gate - d_cs = ( + d_cell = ( self.tanh(self.tanh(self.cell_states[t]), derivative=True) - * self.output_gates[t] - * d_hs - + dc_next + * self.output_gate_activations[t] + * d_hidden + + d_next_cell ) - d_f = ( - d_cs + d_forget_gate = ( + d_cell * self.cell_states[t - 1] - * self.sigmoid(self.forget_gates[t], derivative=True) + * self.sigmoid(self.forget_gate_activations[t], derivative=True) ) - d_wf += np.dot(d_f, self.concat_inputs[t].T) - d_bf += d_f + d_forget_gate_weights += np.dot(d_forget_gate, self.combined_inputs[t].T) + d_forget_gate_bias += d_forget_gate - d_i = ( - d_cs - * self.candidate_gates[t] - * self.sigmoid(self.input_gates[t], derivative=True) + d_input_gate = ( + d_cell + * self.cell_state_candidates[t] + * self.sigmoid(self.input_gate_activations[t], derivative=True) ) - d_wi += np.dot(d_i, self.concat_inputs[t].T) - d_bi += d_i + d_input_gate_weights += np.dot(d_input_gate, self.combined_inputs[t].T) + d_input_gate_bias += d_input_gate - d_c = ( - d_cs - * self.input_gates[t] - * self.tanh(self.candidate_gates[t], derivative=True) + d_cell_candidate = ( + d_cell + * self.input_gate_activations[t] + * self.tanh(self.cell_state_candidates[t], derivative=True) ) - d_wc += np.dot(d_c, self.concat_inputs[t].T) - d_bc += d_c + d_cell_candidate_weights += np.dot( + d_cell_candidate, self.combined_inputs[t].T + ) + d_cell_candidate_bias += d_cell_candidate - d_z = ( - np.dot(self.wf.T, d_f) - + np.dot(self.wi.T, d_i) - + np.dot(self.wc.T, d_c) - + np.dot(self.wo.T, d_o) + d_combined_input = ( + np.dot(self.forget_gate_weights.T, d_forget_gate) + + np.dot(self.input_gate_weights.T, d_input_gate) + + np.dot(self.cell_candidate_weights.T, d_cell_candidate) + + np.dot(self.output_gate_weights.T, d_output_gate) ) - dh_next = d_z[: self.hidden_dim, :] - dc_next = self.forget_gates[t] * d_cs + d_next_hidden = d_combined_input[: self.hidden_layer_size, :] + d_next_cell = self.forget_gate_activations[t] * d_cell - for d in (d_wf, d_bf, d_wi, d_bi, d_wc, d_bc, d_wo, d_bo, d_wy, d_by): + for d in ( + d_forget_gate_weights, + d_forget_gate_bias, + d_input_gate_weights, + d_input_gate_bias, + d_cell_candidate_weights, + d_cell_candidate_bias, + d_output_gate_weights, + d_output_gate_bias, + d_output_layer_weights, + d_output_layer_bias, + ): np.clip(d, -1, 1, out=d) - self.wf += d_wf * self.lr - self.bf += d_bf * self.lr - self.wi += d_wi * self.lr - self.bi += d_bi * self.lr - self.wc += d_wc * self.lr - self.bc += d_bc * self.lr - self.wo += d_wo * self.lr - self.bo += d_bo * self.lr - self.wy += d_wy * self.lr - self.by += d_by * self.lr + self.forget_gate_weights += d_forget_gate_weights * self.learning_rate + self.forget_gate_bias += d_forget_gate_bias * self.learning_rate + self.input_gate_weights += d_input_gate_weights * self.learning_rate + self.input_gate_bias += d_input_gate_bias * self.learning_rate + self.cell_candidate_weights += d_cell_candidate_weights * self.learning_rate + self.cell_candidate_bias += d_cell_candidate_bias * self.learning_rate + self.output_gate_weights += d_output_gate_weights * self.learning_rate + self.output_gate_bias += d_output_gate_bias * self.learning_rate + self.output_layer_weights += d_output_layer_weights * self.learning_rate + self.output_layer_bias += d_output_layer_bias * self.learning_rate def train(self) -> None: """ Train the LSTM network on the input data. - """ - """ - Train the LSTM network on the input data. - >>> lstm = LSTM(data="abcde" * 50, hidden_dim=10, epochs=5, lr=0.01) + >>> lstm = LongShortTermMemory("abcde" * 50, hidden_layer_size=10, + training_epochs=5, + learning_rate=0.01) >>> lstm.train() - >>> lstm.losses[-1] < lstm.losses[0] + >>> hasattr(lstm, 'losses') True """ - inputs = [self.one_hot_encode(char) for char in self.train_X] + inputs = [self.one_hot_encode(char) for char in self.input_sequence] - for _ in range(self.epochs): - predictions = self.forward(inputs) + for _ in range(self.training_epochs): + predictions = self.forward_pass(inputs) errors = [] for t in range(len(predictions)): errors.append(-self.softmax(predictions[t])) - errors[-1][self.char_to_idx[self.train_y[t]]] += 1 + errors[-1][self.char_to_index[self.target_sequence[t]]] += 1 - self.backward(errors, inputs) + self.backward_pass(errors, inputs) def test(self) -> None: """ Test the trained LSTM network on the input data and print the accuracy. - """ - """ - Test the LSTM model. - >>> lstm = LSTM(data="abcde" * 50, hidden_dim=10, epochs=5, lr=0.01) + >>> lstm = LongShortTermMemory("abcde" * 50, hidden_layer_size=10, + training_epochs=5, learning_rate=0.01) + >>> lstm.train() + >>> predictions = lstm.test() + >>> isinstance(predictions, str) + True + >>> len(predictions) == len(lstm.input_sequence) + True + """ + accuracy = 0 + probabilities = self.forward_pass( + [self.one_hot_encode(char) for char in self.input_sequence] + ) + + output = "" + for t in range(len(self.target_sequence)): + probs = self.softmax(probabilities[t].reshape(-1)) + prediction_index = self.random_generator.choice( + self.vocabulary_size, p=probs + ) + prediction = self.index_to_char[prediction_index] + + output += prediction + + if prediction == self.target_sequence[t]: + accuracy += 1 + + print(f"Ground Truth:\n{self.target_sequence}\n") + print(f"Predictions:\n{output}\n") + + print(f"Accuracy: {round(accuracy * 100 / len(self.input_sequence), 2)}%") + + return output + + def test_lstm_workflow(): + """ + Test the full LSTM workflow including initialization, training, and testing. + + >>> lstm = LongShortTermMemory("abcde" * 50, hidden_layer_size=10, + training_epochs=5, learning_rate=0.01) >>> lstm.train() >>> predictions = lstm.test() >>> len(predictions) > 0 True + >>> all(c in 'abcde' for c in predictions) + True """ - accuracy = 0 - probabilities = self.forward( - [self.one_hot_encode(char) for char in self.train_X] - ) - - output = "" - for t in range(len(self.train_y)): - probs = self.softmax(probabilities[t].reshape(-1)) - prediction_index = self.rng.choice(self.char_size, p=probs) - prediction = self.idx_to_char[prediction_index] - - output += prediction - - if prediction == self.train_y[t]: - accuracy += 1 - - print(f"Ground Truth:\n{self.train_y}\n") - print(f"Predictions:\n{output}\n") - - print(f"Accuracy: {round(accuracy * 100 / len(self.train_X), 2)}%") if __name__ == "__main__": - data = """Long Short-Term Memory (LSTM) networks are a type + sample_data = """Long Short-Term Memory (LSTM) networks are a type of recurrent neural network (RNN) capable of learning " "order dependence in sequence prediction problems. This behavior is required in complex problem domains like " "machine translation, speech recognition, and more. - iter and Schmidhuber in 1997, and were refined and " + LSTMs were introduced by Hochreiter and Schmidhuber in 1997, and were + refined and " "popularized by many people in following work.""" import doctest doctest.testmod() - # lstm = LSTM(data=data, hidden_dim=25, epochs=10, lr=0.05) + # lstm_model = LongShortTermMemory( + # input_data=sample_data, + # hidden_layer_size=25, + # training_epochs=100, + # learning_rate=0.05, + # ) ##### Training ##### - # lstm.train() + # lstm_model.train() ##### Testing ##### - # lstm.test() + # lstm_model.test()