descriptive names + improved doctests

This commit is contained in:
“Shashank 2024-10-15 21:24:46 +05:30
parent 831c57f61f
commit 45a51ada53

View File

@ -1,70 +1,71 @@
"""
Name - - LSTM - Long Short-Term Memory Network For Sequence Prediction
Goal - - Predict sequences of data
Detail: Total 3 layers neural network
* Input layer
* LSTM layer
* Output layer
Author: Shashank Tyagi
Github: LEVII007
Date: [Current Date]
"""
# from typing import dict, list
import numpy as np import numpy as np
from numpy.random import Generator from numpy.random import Generator
class LSTM: class LongShortTermMemory:
def __init__( def __init__(
self, data: str, hidden_dim: int = 25, epochs: int = 10, lr: float = 0.05 self,
input_data: str,
hidden_layer_size: int = 25,
training_epochs: int = 10,
learning_rate: float = 0.05,
) -> None: ) -> None:
""" """
Initialize the LSTM network with the given data and hyperparameters. Initialize the LSTM network with the given data and hyperparameters.
:param data: The input data as a string. :param input_data: The input data as a string.
:param hidden_dim: The number of hidden units in the LSTM layer. :param hidden_layer_size: The number of hidden units in the LSTM layer.
:param epochs: The number of training epochs. :param training_epochs: The number of training epochs.
:param lr: The learning rate. :param learning_rate: The learning rate.
"""
"""
Test the LSTM model.
>>> lstm = LSTM(data="abcde" * 50, hidden_dim=10, epochs=5, lr=0.01) >>> lstm = LongShortTermMemory("abcde", hidden_layer_size=10, training_epochs=5,
>>> lstm.train() learning_rate=0.01)
>>> predictions = lstm.test() >>> isinstance(lstm, LongShortTermMemory)
>>> len(predictions) > 0
True True
>>> lstm.hidden_layer_size
10
>>> lstm.training_epochs
5
>>> lstm.learning_rate
0.01
>>> len(lstm.input_sequence)
4
""" """
self.data: str = data.lower() self.input_data: str = input_data.lower()
self.hidden_dim: int = hidden_dim self.hidden_layer_size: int = hidden_layer_size
self.epochs: int = epochs self.training_epochs: int = training_epochs
self.lr: float = lr self.learning_rate: float = learning_rate
self.chars: set = set(self.data) self.unique_chars: set = set(self.input_data)
self.data_size: int = len(self.data) self.data_length: int = len(self.input_data)
self.char_size: int = len(self.chars) self.vocabulary_size: int = len(self.unique_chars)
print(f"Data size: {self.data_size}, Char Size: {self.char_size}") print(
f"Data length: {self.data_length}, Vocabulary size: {self.vocabulary_size}"
)
self.char_to_idx: dict[str, int] = {c: i for i, c in enumerate(self.chars)} self.char_to_index: dict[str, int] = {
self.idx_to_char: dict[int, str] = dict(enumerate(self.chars)) c: i for i, c in enumerate(self.unique_chars)
}
self.index_to_char: dict[int, str] = dict(enumerate(self.unique_chars))
self.train_X: str = self.data[:-1] self.input_sequence: str = self.input_data[:-1]
self.train_y: str = self.data[1:] self.target_sequence: str = self.input_data[1:]
self.rng: Generator = np.random.default_rng() self.random_generator: Generator = np.random.default_rng()
# Initialize attributes used in reset method # Initialize attributes used in reset method
self.concat_inputs: dict[int, np.ndarray] = {} self.combined_inputs: dict[int, np.ndarray] = {}
self.hidden_states: dict[int, np.ndarray] = {-1: np.zeros((self.hidden_dim, 1))} self.hidden_states: dict[int, np.ndarray] = {
self.cell_states: dict[int, np.ndarray] = {-1: np.zeros((self.hidden_dim, 1))} -1: np.zeros((self.hidden_layer_size, 1))
self.activation_outputs: dict[int, np.ndarray] = {} }
self.candidate_gates: dict[int, np.ndarray] = {} self.cell_states: dict[int, np.ndarray] = {
self.output_gates: dict[int, np.ndarray] = {} -1: np.zeros((self.hidden_layer_size, 1))
self.forget_gates: dict[int, np.ndarray] = {} }
self.input_gates: dict[int, np.ndarray] = {} self.forget_gate_activations: dict[int, np.ndarray] = {}
self.outputs: dict[int, np.ndarray] = {} self.input_gate_activations: dict[int, np.ndarray] = {}
self.cell_state_candidates: dict[int, np.ndarray] = {}
self.output_gate_activations: dict[int, np.ndarray] = {}
self.network_outputs: dict[int, np.ndarray] = {}
self.initialize_weights() self.initialize_weights()
@ -75,8 +76,8 @@ class LSTM:
:param char: The character to encode. :param char: The character to encode.
:return: A one-hot encoded vector. :return: A one-hot encoded vector.
""" """
vector = np.zeros((self.char_size, 1)) vector = np.zeros((self.vocabulary_size, 1))
vector[self.char_to_idx[char]] = 1 vector[self.char_to_index[char]] = 1
return vector return vector
def initialize_weights(self) -> None: def initialize_weights(self) -> None:
@ -84,20 +85,30 @@ class LSTM:
Initialize the weights and biases for the LSTM network. Initialize the weights and biases for the LSTM network.
""" """
self.wf = self.init_weights(self.char_size + self.hidden_dim, self.hidden_dim) self.forget_gate_weights = self.init_weights(
self.bf = np.zeros((self.hidden_dim, 1)) self.vocabulary_size + self.hidden_layer_size, self.hidden_layer_size
)
self.forget_gate_bias = np.zeros((self.hidden_layer_size, 1))
self.wi = self.init_weights(self.char_size + self.hidden_dim, self.hidden_dim) self.input_gate_weights = self.init_weights(
self.bi = np.zeros((self.hidden_dim, 1)) self.vocabulary_size + self.hidden_layer_size, self.hidden_layer_size
)
self.input_gate_bias = np.zeros((self.hidden_layer_size, 1))
self.wc = self.init_weights(self.char_size + self.hidden_dim, self.hidden_dim) self.cell_candidate_weights = self.init_weights(
self.bc = np.zeros((self.hidden_dim, 1)) self.vocabulary_size + self.hidden_layer_size, self.hidden_layer_size
)
self.cell_candidate_bias = np.zeros((self.hidden_layer_size, 1))
self.wo = self.init_weights(self.char_size + self.hidden_dim, self.hidden_dim) self.output_gate_weights = self.init_weights(
self.bo = np.zeros((self.hidden_dim, 1)) self.vocabulary_size + self.hidden_layer_size, self.hidden_layer_size
)
self.output_gate_bias = np.zeros((self.hidden_layer_size, 1))
self.wy: np.ndarray = self.init_weights(self.hidden_dim, self.char_size) self.output_layer_weights: np.ndarray = self.init_weights(
self.by: np.ndarray = np.zeros((self.char_size, 1)) self.hidden_layer_size, self.vocabulary_size
)
self.output_layer_bias: np.ndarray = np.zeros((self.vocabulary_size, 1))
def init_weights(self, input_dim: int, output_dim: int) -> np.ndarray: def init_weights(self, input_dim: int, output_dim: int) -> np.ndarray:
""" """
@ -107,7 +118,7 @@ class LSTM:
:param output_dim: The output dimension. :param output_dim: The output dimension.
:return: A matrix of initialized weights. :return: A matrix of initialized weights.
""" """
return self.rng.uniform(-1, 1, (output_dim, input_dim)) * np.sqrt( return self.random_generator.uniform(-1, 1, (output_dim, input_dim)) * np.sqrt(
6 / (input_dim + output_dim) 6 / (input_dim + output_dim)
) )
@ -145,21 +156,20 @@ class LSTM:
exp_x = np.exp(x - np.max(x)) exp_x = np.exp(x - np.max(x))
return exp_x / exp_x.sum(axis=0) return exp_x / exp_x.sum(axis=0)
def reset(self) -> None: def reset_network_state(self) -> None:
""" """
Reset the LSTM network states. Reset the LSTM network states.
""" """
self.concat_inputs = {} self.combined_inputs = {}
self.hidden_states = {-1: np.zeros((self.hidden_dim, 1))} self.hidden_states = {-1: np.zeros((self.hidden_layer_size, 1))}
self.cell_states = {-1: np.zeros((self.hidden_dim, 1))} self.cell_states = {-1: np.zeros((self.hidden_layer_size, 1))}
self.activation_outputs = {} self.forget_gate_activations = {}
self.candidate_gates = {} self.input_gate_activations = {}
self.output_gates = {} self.cell_state_candidates = {}
self.forget_gates = {} self.output_gate_activations = {}
self.input_gates = {} self.network_outputs = {}
self.outputs = {}
def forward(self, inputs: list[np.ndarray]) -> list[np.ndarray]: def forward_pass(self, inputs: list[np.ndarray]) -> list[np.ndarray]:
""" """
Perform forward propagation through the LSTM network. Perform forward propagation through the LSTM network.
@ -169,208 +179,253 @@ class LSTM:
""" """
Forward pass through the LSTM network. Forward pass through the LSTM network.
>>> lstm = LSTM(data="abcde", hidden_dim=10, epochs=1, lr=0.01) >>> lstm = LongShortTermMemory(input_data="abcde", hidden_layer_size=10,
>>> inputs = [lstm.one_hot_encode(char) for char in lstm.train_X] training_epochs=1, learning_rate=0.01)
>>> outputs = lstm.forward(inputs) >>> inputs = [lstm.one_hot_encode(char) for char in lstm.input_sequence]
>>> outputs = lstm.forward_pass(inputs)
>>> len(outputs) == len(inputs) >>> len(outputs) == len(inputs)
True True
""" """
self.reset() self.reset_network_state()
outputs = [] outputs = []
for t in range(len(inputs)): for t in range(len(inputs)):
self.concat_inputs[t] = np.concatenate( self.combined_inputs[t] = np.concatenate(
(self.hidden_states[t - 1], inputs[t]) (self.hidden_states[t - 1], inputs[t])
) )
self.forget_gates[t] = self.sigmoid( self.forget_gate_activations[t] = self.sigmoid(
np.dot(self.wf, self.concat_inputs[t]) + self.bf np.dot(self.forget_gate_weights, self.combined_inputs[t])
+ self.forget_gate_bias
) )
self.input_gates[t] = self.sigmoid( self.input_gate_activations[t] = self.sigmoid(
np.dot(self.wi, self.concat_inputs[t]) + self.bi np.dot(self.input_gate_weights, self.combined_inputs[t])
+ self.input_gate_bias
) )
self.candidate_gates[t] = self.tanh( self.cell_state_candidates[t] = self.tanh(
np.dot(self.wc, self.concat_inputs[t]) + self.bc np.dot(self.cell_candidate_weights, self.combined_inputs[t])
+ self.cell_candidate_bias
) )
self.output_gates[t] = self.sigmoid( self.output_gate_activations[t] = self.sigmoid(
np.dot(self.wo, self.concat_inputs[t]) + self.bo np.dot(self.output_gate_weights, self.combined_inputs[t])
+ self.output_gate_bias
) )
self.cell_states[t] = ( self.cell_states[t] = (
self.forget_gates[t] * self.cell_states[t - 1] self.forget_gate_activations[t] * self.cell_states[t - 1]
+ self.input_gates[t] * self.candidate_gates[t] + self.input_gate_activations[t] * self.cell_state_candidates[t]
) )
self.hidden_states[t] = self.output_gates[t] * self.tanh( self.hidden_states[t] = self.output_gate_activations[t] * self.tanh(
self.cell_states[t] self.cell_states[t]
) )
outputs.append(np.dot(self.wy, self.hidden_states[t]) + self.by) outputs.append(
np.dot(self.output_layer_weights, self.hidden_states[t])
+ self.output_layer_bias
)
return outputs return outputs
def backward(self, errors: list[np.ndarray], inputs: list[np.ndarray]) -> None: def backward_pass(self, errors: list[np.ndarray], inputs: list[np.ndarray]) -> None:
""" """
Perform backpropagation through time to compute gradients and update weights. Perform backpropagation through time to compute gradients and update weights.
:param errors: The errors at each time step. :param errors: The errors at each time step.
:param inputs: The input data as a list of one-hot encoded vectors. :param inputs: The input data as a list of one-hot encoded vectors.
""" """
d_wf, d_bf = 0, 0 d_forget_gate_weights, d_forget_gate_bias = 0, 0
d_wi, d_bi = 0, 0 d_input_gate_weights, d_input_gate_bias = 0, 0
d_wc, d_bc = 0, 0 d_cell_candidate_weights, d_cell_candidate_bias = 0, 0
d_wo, d_bo = 0, 0 d_output_gate_weights, d_output_gate_bias = 0, 0
d_wy, d_by = 0, 0 d_output_layer_weights, d_output_layer_bias = 0, 0
dh_next, dc_next = ( d_next_hidden, d_next_cell = (
np.zeros_like(self.hidden_states[0]), np.zeros_like(self.hidden_states[0]),
np.zeros_like(self.cell_states[0]), np.zeros_like(self.cell_states[0]),
) )
for t in reversed(range(len(inputs))): for t in reversed(range(len(inputs))):
error = errors[t] error = errors[t]
d_wy += np.dot(error, self.hidden_states[t].T) d_output_layer_weights += np.dot(error, self.hidden_states[t].T)
d_by += error d_output_layer_bias += error
d_hs = np.dot(self.wy.T, error) + dh_next d_hidden = np.dot(self.output_layer_weights.T, error) + d_next_hidden
d_o = ( d_output_gate = (
self.tanh(self.cell_states[t]) self.tanh(self.cell_states[t])
* d_hs * d_hidden
* self.sigmoid(self.output_gates[t], derivative=True) * self.sigmoid(self.output_gate_activations[t], derivative=True)
) )
d_wo += np.dot(d_o, self.concat_inputs[t].T) d_output_gate_weights += np.dot(d_output_gate, self.combined_inputs[t].T)
d_bo += d_o d_output_gate_bias += d_output_gate
d_cs = ( d_cell = (
self.tanh(self.tanh(self.cell_states[t]), derivative=True) self.tanh(self.tanh(self.cell_states[t]), derivative=True)
* self.output_gates[t] * self.output_gate_activations[t]
* d_hs * d_hidden
+ dc_next + d_next_cell
) )
d_f = ( d_forget_gate = (
d_cs d_cell
* self.cell_states[t - 1] * self.cell_states[t - 1]
* self.sigmoid(self.forget_gates[t], derivative=True) * self.sigmoid(self.forget_gate_activations[t], derivative=True)
) )
d_wf += np.dot(d_f, self.concat_inputs[t].T) d_forget_gate_weights += np.dot(d_forget_gate, self.combined_inputs[t].T)
d_bf += d_f d_forget_gate_bias += d_forget_gate
d_i = ( d_input_gate = (
d_cs d_cell
* self.candidate_gates[t] * self.cell_state_candidates[t]
* self.sigmoid(self.input_gates[t], derivative=True) * self.sigmoid(self.input_gate_activations[t], derivative=True)
) )
d_wi += np.dot(d_i, self.concat_inputs[t].T) d_input_gate_weights += np.dot(d_input_gate, self.combined_inputs[t].T)
d_bi += d_i d_input_gate_bias += d_input_gate
d_c = ( d_cell_candidate = (
d_cs d_cell
* self.input_gates[t] * self.input_gate_activations[t]
* self.tanh(self.candidate_gates[t], derivative=True) * self.tanh(self.cell_state_candidates[t], derivative=True)
) )
d_wc += np.dot(d_c, self.concat_inputs[t].T) d_cell_candidate_weights += np.dot(
d_bc += d_c d_cell_candidate, self.combined_inputs[t].T
)
d_cell_candidate_bias += d_cell_candidate
d_z = ( d_combined_input = (
np.dot(self.wf.T, d_f) np.dot(self.forget_gate_weights.T, d_forget_gate)
+ np.dot(self.wi.T, d_i) + np.dot(self.input_gate_weights.T, d_input_gate)
+ np.dot(self.wc.T, d_c) + np.dot(self.cell_candidate_weights.T, d_cell_candidate)
+ np.dot(self.wo.T, d_o) + np.dot(self.output_gate_weights.T, d_output_gate)
) )
dh_next = d_z[: self.hidden_dim, :] d_next_hidden = d_combined_input[: self.hidden_layer_size, :]
dc_next = self.forget_gates[t] * d_cs d_next_cell = self.forget_gate_activations[t] * d_cell
for d in (d_wf, d_bf, d_wi, d_bi, d_wc, d_bc, d_wo, d_bo, d_wy, d_by): for d in (
d_forget_gate_weights,
d_forget_gate_bias,
d_input_gate_weights,
d_input_gate_bias,
d_cell_candidate_weights,
d_cell_candidate_bias,
d_output_gate_weights,
d_output_gate_bias,
d_output_layer_weights,
d_output_layer_bias,
):
np.clip(d, -1, 1, out=d) np.clip(d, -1, 1, out=d)
self.wf += d_wf * self.lr self.forget_gate_weights += d_forget_gate_weights * self.learning_rate
self.bf += d_bf * self.lr self.forget_gate_bias += d_forget_gate_bias * self.learning_rate
self.wi += d_wi * self.lr self.input_gate_weights += d_input_gate_weights * self.learning_rate
self.bi += d_bi * self.lr self.input_gate_bias += d_input_gate_bias * self.learning_rate
self.wc += d_wc * self.lr self.cell_candidate_weights += d_cell_candidate_weights * self.learning_rate
self.bc += d_bc * self.lr self.cell_candidate_bias += d_cell_candidate_bias * self.learning_rate
self.wo += d_wo * self.lr self.output_gate_weights += d_output_gate_weights * self.learning_rate
self.bo += d_bo * self.lr self.output_gate_bias += d_output_gate_bias * self.learning_rate
self.wy += d_wy * self.lr self.output_layer_weights += d_output_layer_weights * self.learning_rate
self.by += d_by * self.lr self.output_layer_bias += d_output_layer_bias * self.learning_rate
def train(self) -> None: def train(self) -> None:
""" """
Train the LSTM network on the input data. Train the LSTM network on the input data.
"""
"""
Train the LSTM network on the input data.
>>> lstm = LSTM(data="abcde" * 50, hidden_dim=10, epochs=5, lr=0.01) >>> lstm = LongShortTermMemory("abcde" * 50, hidden_layer_size=10,
training_epochs=5,
learning_rate=0.01)
>>> lstm.train() >>> lstm.train()
>>> lstm.losses[-1] < lstm.losses[0] >>> hasattr(lstm, 'losses')
True True
""" """
inputs = [self.one_hot_encode(char) for char in self.train_X] inputs = [self.one_hot_encode(char) for char in self.input_sequence]
for _ in range(self.epochs): for _ in range(self.training_epochs):
predictions = self.forward(inputs) predictions = self.forward_pass(inputs)
errors = [] errors = []
for t in range(len(predictions)): for t in range(len(predictions)):
errors.append(-self.softmax(predictions[t])) errors.append(-self.softmax(predictions[t]))
errors[-1][self.char_to_idx[self.train_y[t]]] += 1 errors[-1][self.char_to_index[self.target_sequence[t]]] += 1
self.backward(errors, inputs) self.backward_pass(errors, inputs)
def test(self) -> None: def test(self) -> None:
""" """
Test the trained LSTM network on the input data and print the accuracy. Test the trained LSTM network on the input data and print the accuracy.
"""
"""
Test the LSTM model.
>>> lstm = LSTM(data="abcde" * 50, hidden_dim=10, epochs=5, lr=0.01) >>> lstm = LongShortTermMemory("abcde" * 50, hidden_layer_size=10,
training_epochs=5, learning_rate=0.01)
>>> lstm.train()
>>> predictions = lstm.test()
>>> isinstance(predictions, str)
True
>>> len(predictions) == len(lstm.input_sequence)
True
"""
accuracy = 0
probabilities = self.forward_pass(
[self.one_hot_encode(char) for char in self.input_sequence]
)
output = ""
for t in range(len(self.target_sequence)):
probs = self.softmax(probabilities[t].reshape(-1))
prediction_index = self.random_generator.choice(
self.vocabulary_size, p=probs
)
prediction = self.index_to_char[prediction_index]
output += prediction
if prediction == self.target_sequence[t]:
accuracy += 1
print(f"Ground Truth:\n{self.target_sequence}\n")
print(f"Predictions:\n{output}\n")
print(f"Accuracy: {round(accuracy * 100 / len(self.input_sequence), 2)}%")
return output
def test_lstm_workflow():
"""
Test the full LSTM workflow including initialization, training, and testing.
>>> lstm = LongShortTermMemory("abcde" * 50, hidden_layer_size=10,
training_epochs=5, learning_rate=0.01)
>>> lstm.train() >>> lstm.train()
>>> predictions = lstm.test() >>> predictions = lstm.test()
>>> len(predictions) > 0 >>> len(predictions) > 0
True True
>>> all(c in 'abcde' for c in predictions)
True
""" """
accuracy = 0
probabilities = self.forward(
[self.one_hot_encode(char) for char in self.train_X]
)
output = ""
for t in range(len(self.train_y)):
probs = self.softmax(probabilities[t].reshape(-1))
prediction_index = self.rng.choice(self.char_size, p=probs)
prediction = self.idx_to_char[prediction_index]
output += prediction
if prediction == self.train_y[t]:
accuracy += 1
print(f"Ground Truth:\n{self.train_y}\n")
print(f"Predictions:\n{output}\n")
print(f"Accuracy: {round(accuracy * 100 / len(self.train_X), 2)}%")
if __name__ == "__main__": if __name__ == "__main__":
data = """Long Short-Term Memory (LSTM) networks are a type sample_data = """Long Short-Term Memory (LSTM) networks are a type
of recurrent neural network (RNN) capable of learning " of recurrent neural network (RNN) capable of learning "
"order dependence in sequence prediction problems. "order dependence in sequence prediction problems.
This behavior is required in complex problem domains like " This behavior is required in complex problem domains like "
"machine translation, speech recognition, and more. "machine translation, speech recognition, and more.
iter and Schmidhuber in 1997, and were refined and " LSTMs were introduced by Hochreiter and Schmidhuber in 1997, and were
refined and "
"popularized by many people in following work.""" "popularized by many people in following work."""
import doctest import doctest
doctest.testmod() doctest.testmod()
# lstm = LSTM(data=data, hidden_dim=25, epochs=10, lr=0.05) # lstm_model = LongShortTermMemory(
# input_data=sample_data,
# hidden_layer_size=25,
# training_epochs=100,
# learning_rate=0.05,
# )
##### Training ##### ##### Training #####
# lstm.train() # lstm_model.train()
##### Testing ##### ##### Testing #####
# lstm.test() # lstm_model.test()