From 5987f861926c7560cd46c1e33c3cc2c0506c0ee1 Mon Sep 17 00:00:00 2001 From: Poojan Smart <44301271+PoojanSmart@users.noreply.github.com> Date: Fri, 27 Oct 2023 14:17:24 +0530 Subject: [PATCH] Add automatic differentiation algorithm (#10977) * Added automatic differentiation algorithm * file name changed * Resolved pre commit errors * updated dependency * added noqa for ignoring check * adding typing_extension for adding Self type in __new__ * [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci * sorted requirement.text dependency * [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci * resolved ruff --------- Co-authored-by: pre-commit-ci[bot] <66853113+pre-commit-ci[bot]@users.noreply.github.com> --- machine_learning/automatic_differentiation.py | 327 ++++++++++++++++++ requirements.txt | 3 +- 2 files changed, 329 insertions(+), 1 deletion(-) create mode 100644 machine_learning/automatic_differentiation.py diff --git a/machine_learning/automatic_differentiation.py b/machine_learning/automatic_differentiation.py new file mode 100644 index 000000000..cd2e5cdaa --- /dev/null +++ b/machine_learning/automatic_differentiation.py @@ -0,0 +1,327 @@ +""" +Demonstration of the Automatic Differentiation (Reverse mode). + +Reference: https://en.wikipedia.org/wiki/Automatic_differentiation + +Author: Poojan Smart +Email: smrtpoojan@gmail.com +""" +from __future__ import annotations + +from collections import defaultdict +from enum import Enum +from types import TracebackType +from typing import Any + +import numpy as np +from typing_extensions import Self # noqa: UP035 + + +class OpType(Enum): + """ + Class represents list of supported operations on Variable for gradient calculation. + """ + + ADD = 0 + SUB = 1 + MUL = 2 + DIV = 3 + MATMUL = 4 + POWER = 5 + NOOP = 6 + + +class Variable: + """ + Class represents n-dimensional object which is used to wrap numpy array on which + operations will be performed and the gradient will be calculated. + + Examples: + >>> Variable(5.0) + Variable(5.0) + >>> Variable([5.0, 2.9]) + Variable([5. 2.9]) + >>> Variable([5.0, 2.9]) + Variable([1.0, 5.5]) + Variable([6. 8.4]) + >>> Variable([[8.0, 10.0]]) + Variable([[ 8. 10.]]) + """ + + def __init__(self, value: Any) -> None: + self.value = np.array(value) + + # pointers to the operations to which the Variable is input + self.param_to: list[Operation] = [] + # pointer to the operation of which the Variable is output of + self.result_of: Operation = Operation(OpType.NOOP) + + def __repr__(self) -> str: + return f"Variable({self.value})" + + def to_ndarray(self) -> np.ndarray: + return self.value + + def __add__(self, other: Variable) -> Variable: + result = Variable(self.value + other.value) + + with GradientTracker() as tracker: + # if tracker is enabled, computation graph will be updated + if tracker.enabled: + tracker.append(OpType.ADD, params=[self, other], output=result) + return result + + def __sub__(self, other: Variable) -> Variable: + result = Variable(self.value - other.value) + + with GradientTracker() as tracker: + # if tracker is enabled, computation graph will be updated + if tracker.enabled: + tracker.append(OpType.SUB, params=[self, other], output=result) + return result + + def __mul__(self, other: Variable) -> Variable: + result = Variable(self.value * other.value) + + with GradientTracker() as tracker: + # if tracker is enabled, computation graph will be updated + if tracker.enabled: + tracker.append(OpType.MUL, params=[self, other], output=result) + return result + + def __truediv__(self, other: Variable) -> Variable: + result = Variable(self.value / other.value) + + with GradientTracker() as tracker: + # if tracker is enabled, computation graph will be updated + if tracker.enabled: + tracker.append(OpType.DIV, params=[self, other], output=result) + return result + + def __matmul__(self, other: Variable) -> Variable: + result = Variable(self.value @ other.value) + + with GradientTracker() as tracker: + # if tracker is enabled, computation graph will be updated + if tracker.enabled: + tracker.append(OpType.MATMUL, params=[self, other], output=result) + return result + + def __pow__(self, power: int) -> Variable: + result = Variable(self.value**power) + + with GradientTracker() as tracker: + # if tracker is enabled, computation graph will be updated + if tracker.enabled: + tracker.append( + OpType.POWER, + params=[self], + output=result, + other_params={"power": power}, + ) + return result + + def add_param_to(self, param_to: Operation) -> None: + self.param_to.append(param_to) + + def add_result_of(self, result_of: Operation) -> None: + self.result_of = result_of + + +class Operation: + """ + Class represents operation between single or two Variable objects. + Operation objects contains type of operation, pointers to input Variable + objects and pointer to resulting Variable from the operation. + """ + + def __init__( + self, + op_type: OpType, + other_params: dict | None = None, + ) -> None: + self.op_type = op_type + self.other_params = {} if other_params is None else other_params + + def add_params(self, params: list[Variable]) -> None: + self.params = params + + def add_output(self, output: Variable) -> None: + self.output = output + + def __eq__(self, value) -> bool: + return self.op_type == value if isinstance(value, OpType) else False + + +class GradientTracker: + """ + Class contains methods to compute partial derivatives of Variable + based on the computation graph. + + Examples: + + >>> with GradientTracker() as tracker: + ... a = Variable([2.0, 5.0]) + ... b = Variable([1.0, 2.0]) + ... m = Variable([1.0, 2.0]) + ... c = a + b + ... d = a * b + ... e = c / d + >>> tracker.gradient(e, a) + array([-0.25, -0.04]) + >>> tracker.gradient(e, b) + array([-1. , -0.25]) + >>> tracker.gradient(e, m) is None + True + + >>> with GradientTracker() as tracker: + ... a = Variable([[2.0, 5.0]]) + ... b = Variable([[1.0], [2.0]]) + ... c = a @ b + >>> tracker.gradient(c, a) + array([[1., 2.]]) + >>> tracker.gradient(c, b) + array([[2.], + [5.]]) + + >>> with GradientTracker() as tracker: + ... a = Variable([[2.0, 5.0]]) + ... b = a ** 3 + >>> tracker.gradient(b, a) + array([[12., 75.]]) + """ + + instance = None + + def __new__(cls) -> Self: + """ + Executes at the creation of class object and returns if + object is already created. This class follows singleton + design pattern. + """ + if cls.instance is None: + cls.instance = super().__new__(cls) + return cls.instance + + def __init__(self) -> None: + self.enabled = False + + def __enter__(self) -> Self: + self.enabled = True + return self + + def __exit__( + self, + exc_type: type[BaseException] | None, + exc: BaseException | None, + traceback: TracebackType | None, + ) -> None: + self.enabled = False + + def append( + self, + op_type: OpType, + params: list[Variable], + output: Variable, + other_params: dict | None = None, + ) -> None: + """ + Adds Operation object to the related Variable objects for + creating computational graph for calculating gradients. + + Args: + op_type: Operation type + params: Input parameters to the operation + output: Output variable of the operation + """ + operation = Operation(op_type, other_params=other_params) + param_nodes = [] + for param in params: + param.add_param_to(operation) + param_nodes.append(param) + output.add_result_of(operation) + + operation.add_params(param_nodes) + operation.add_output(output) + + def gradient(self, target: Variable, source: Variable) -> np.ndarray | None: + """ + Reverse accumulation of partial derivatives to calculate gradients + of target variable with respect to source variable. + + Args: + target: target variable for which gradients are calculated. + source: source variable with respect to which the gradients are + calculated. + + Returns: + Gradient of the source variable with respect to the target variable + """ + + # partial derivatives with respect to target + partial_deriv = defaultdict(lambda: 0) + partial_deriv[target] = np.ones_like(target.to_ndarray()) + + # iterating through each operations in the computation graph + operation_queue = [target.result_of] + while len(operation_queue) > 0: + operation = operation_queue.pop() + for param in operation.params: + # as per the chain rule, multiplying partial derivatives + # of variables with respect to the target + dparam_doutput = self.derivative(param, operation) + dparam_dtarget = dparam_doutput * partial_deriv[operation.output] + partial_deriv[param] += dparam_dtarget + + if param.result_of and param.result_of != OpType.NOOP: + operation_queue.append(param.result_of) + + return partial_deriv.get(source) + + def derivative(self, param: Variable, operation: Operation) -> np.ndarray: + """ + Compute the derivative of given operation/function + + Args: + param: variable to be differentiated + operation: function performed on the input variable + + Returns: + Derivative of input variable with respect to the output of + the operation + """ + params = operation.params + + if operation == OpType.ADD: + return np.ones_like(params[0].to_ndarray(), dtype=np.float64) + if operation == OpType.SUB: + if params[0] == param: + return np.ones_like(params[0].to_ndarray(), dtype=np.float64) + return -np.ones_like(params[1].to_ndarray(), dtype=np.float64) + if operation == OpType.MUL: + return ( + params[1].to_ndarray().T + if params[0] == param + else params[0].to_ndarray().T + ) + if operation == OpType.DIV: + if params[0] == param: + return 1 / params[1].to_ndarray() + return -params[0].to_ndarray() / (params[1].to_ndarray() ** 2) + if operation == OpType.MATMUL: + return ( + params[1].to_ndarray().T + if params[0] == param + else params[0].to_ndarray().T + ) + if operation == OpType.POWER: + power = operation.other_params["power"] + return power * (params[0].to_ndarray() ** (power - 1)) + + err_msg = f"invalid operation type: {operation.op_type}" + raise ValueError(err_msg) + + +if __name__ == "__main__": + import doctest + + doctest.testmod() diff --git a/requirements.txt b/requirements.txt index 05d9f1e8c..8937f6bb0 100644 --- a/requirements.txt +++ b/requirements.txt @@ -19,5 +19,6 @@ statsmodels sympy tensorflow ; python_version < '3.12' tweepy -xgboost # yulewalker # uncomment once audio_filters/equal_loudness_filter.py is fixed +typing_extensions +xgboost