2023-10-27 08:47:24 +00:00
|
|
|
"""
|
|
|
|
Demonstration of the Automatic Differentiation (Reverse mode).
|
|
|
|
|
|
|
|
Reference: https://en.wikipedia.org/wiki/Automatic_differentiation
|
|
|
|
|
|
|
|
Author: Poojan Smart
|
|
|
|
Email: smrtpoojan@gmail.com
|
|
|
|
"""
|
2024-03-13 06:52:41 +00:00
|
|
|
|
2023-10-27 08:47:24 +00:00
|
|
|
from __future__ import annotations
|
|
|
|
|
|
|
|
from collections import defaultdict
|
|
|
|
from enum import Enum
|
|
|
|
from types import TracebackType
|
|
|
|
from typing import Any
|
|
|
|
|
|
|
|
import numpy as np
|
|
|
|
from typing_extensions import Self # noqa: UP035
|
|
|
|
|
|
|
|
|
|
|
|
class OpType(Enum):
|
|
|
|
"""
|
|
|
|
Class represents list of supported operations on Variable for gradient calculation.
|
|
|
|
"""
|
|
|
|
|
|
|
|
ADD = 0
|
|
|
|
SUB = 1
|
|
|
|
MUL = 2
|
|
|
|
DIV = 3
|
|
|
|
MATMUL = 4
|
|
|
|
POWER = 5
|
|
|
|
NOOP = 6
|
|
|
|
|
|
|
|
|
|
|
|
class Variable:
|
|
|
|
"""
|
|
|
|
Class represents n-dimensional object which is used to wrap numpy array on which
|
|
|
|
operations will be performed and the gradient will be calculated.
|
|
|
|
|
|
|
|
Examples:
|
|
|
|
>>> Variable(5.0)
|
|
|
|
Variable(5.0)
|
|
|
|
>>> Variable([5.0, 2.9])
|
|
|
|
Variable([5. 2.9])
|
|
|
|
>>> Variable([5.0, 2.9]) + Variable([1.0, 5.5])
|
|
|
|
Variable([6. 8.4])
|
|
|
|
>>> Variable([[8.0, 10.0]])
|
|
|
|
Variable([[ 8. 10.]])
|
|
|
|
"""
|
|
|
|
|
|
|
|
def __init__(self, value: Any) -> None:
|
|
|
|
self.value = np.array(value)
|
|
|
|
|
|
|
|
# pointers to the operations to which the Variable is input
|
|
|
|
self.param_to: list[Operation] = []
|
|
|
|
# pointer to the operation of which the Variable is output of
|
|
|
|
self.result_of: Operation = Operation(OpType.NOOP)
|
|
|
|
|
|
|
|
def __repr__(self) -> str:
|
|
|
|
return f"Variable({self.value})"
|
|
|
|
|
|
|
|
def to_ndarray(self) -> np.ndarray:
|
|
|
|
return self.value
|
|
|
|
|
|
|
|
def __add__(self, other: Variable) -> Variable:
|
|
|
|
result = Variable(self.value + other.value)
|
|
|
|
|
|
|
|
with GradientTracker() as tracker:
|
|
|
|
# if tracker is enabled, computation graph will be updated
|
|
|
|
if tracker.enabled:
|
|
|
|
tracker.append(OpType.ADD, params=[self, other], output=result)
|
|
|
|
return result
|
|
|
|
|
|
|
|
def __sub__(self, other: Variable) -> Variable:
|
|
|
|
result = Variable(self.value - other.value)
|
|
|
|
|
|
|
|
with GradientTracker() as tracker:
|
|
|
|
# if tracker is enabled, computation graph will be updated
|
|
|
|
if tracker.enabled:
|
|
|
|
tracker.append(OpType.SUB, params=[self, other], output=result)
|
|
|
|
return result
|
|
|
|
|
|
|
|
def __mul__(self, other: Variable) -> Variable:
|
|
|
|
result = Variable(self.value * other.value)
|
|
|
|
|
|
|
|
with GradientTracker() as tracker:
|
|
|
|
# if tracker is enabled, computation graph will be updated
|
|
|
|
if tracker.enabled:
|
|
|
|
tracker.append(OpType.MUL, params=[self, other], output=result)
|
|
|
|
return result
|
|
|
|
|
|
|
|
def __truediv__(self, other: Variable) -> Variable:
|
|
|
|
result = Variable(self.value / other.value)
|
|
|
|
|
|
|
|
with GradientTracker() as tracker:
|
|
|
|
# if tracker is enabled, computation graph will be updated
|
|
|
|
if tracker.enabled:
|
|
|
|
tracker.append(OpType.DIV, params=[self, other], output=result)
|
|
|
|
return result
|
|
|
|
|
|
|
|
def __matmul__(self, other: Variable) -> Variable:
|
|
|
|
result = Variable(self.value @ other.value)
|
|
|
|
|
|
|
|
with GradientTracker() as tracker:
|
|
|
|
# if tracker is enabled, computation graph will be updated
|
|
|
|
if tracker.enabled:
|
|
|
|
tracker.append(OpType.MATMUL, params=[self, other], output=result)
|
|
|
|
return result
|
|
|
|
|
|
|
|
def __pow__(self, power: int) -> Variable:
|
|
|
|
result = Variable(self.value**power)
|
|
|
|
|
|
|
|
with GradientTracker() as tracker:
|
|
|
|
# if tracker is enabled, computation graph will be updated
|
|
|
|
if tracker.enabled:
|
|
|
|
tracker.append(
|
|
|
|
OpType.POWER,
|
|
|
|
params=[self],
|
|
|
|
output=result,
|
|
|
|
other_params={"power": power},
|
|
|
|
)
|
|
|
|
return result
|
|
|
|
|
|
|
|
def add_param_to(self, param_to: Operation) -> None:
|
|
|
|
self.param_to.append(param_to)
|
|
|
|
|
|
|
|
def add_result_of(self, result_of: Operation) -> None:
|
|
|
|
self.result_of = result_of
|
|
|
|
|
|
|
|
|
|
|
|
class Operation:
|
|
|
|
"""
|
|
|
|
Class represents operation between single or two Variable objects.
|
|
|
|
Operation objects contains type of operation, pointers to input Variable
|
|
|
|
objects and pointer to resulting Variable from the operation.
|
|
|
|
"""
|
|
|
|
|
|
|
|
def __init__(
|
|
|
|
self,
|
|
|
|
op_type: OpType,
|
|
|
|
other_params: dict | None = None,
|
|
|
|
) -> None:
|
|
|
|
self.op_type = op_type
|
|
|
|
self.other_params = {} if other_params is None else other_params
|
|
|
|
|
|
|
|
def add_params(self, params: list[Variable]) -> None:
|
|
|
|
self.params = params
|
|
|
|
|
|
|
|
def add_output(self, output: Variable) -> None:
|
|
|
|
self.output = output
|
|
|
|
|
|
|
|
def __eq__(self, value) -> bool:
|
|
|
|
return self.op_type == value if isinstance(value, OpType) else False
|
|
|
|
|
|
|
|
|
|
|
|
class GradientTracker:
|
|
|
|
"""
|
|
|
|
Class contains methods to compute partial derivatives of Variable
|
|
|
|
based on the computation graph.
|
|
|
|
|
|
|
|
Examples:
|
|
|
|
|
|
|
|
>>> with GradientTracker() as tracker:
|
|
|
|
... a = Variable([2.0, 5.0])
|
|
|
|
... b = Variable([1.0, 2.0])
|
|
|
|
... m = Variable([1.0, 2.0])
|
|
|
|
... c = a + b
|
|
|
|
... d = a * b
|
|
|
|
... e = c / d
|
|
|
|
>>> tracker.gradient(e, a)
|
|
|
|
array([-0.25, -0.04])
|
|
|
|
>>> tracker.gradient(e, b)
|
|
|
|
array([-1. , -0.25])
|
|
|
|
>>> tracker.gradient(e, m) is None
|
|
|
|
True
|
|
|
|
|
|
|
|
>>> with GradientTracker() as tracker:
|
|
|
|
... a = Variable([[2.0, 5.0]])
|
|
|
|
... b = Variable([[1.0], [2.0]])
|
|
|
|
... c = a @ b
|
|
|
|
>>> tracker.gradient(c, a)
|
|
|
|
array([[1., 2.]])
|
|
|
|
>>> tracker.gradient(c, b)
|
|
|
|
array([[2.],
|
|
|
|
[5.]])
|
|
|
|
|
|
|
|
>>> with GradientTracker() as tracker:
|
|
|
|
... a = Variable([[2.0, 5.0]])
|
|
|
|
... b = a ** 3
|
|
|
|
>>> tracker.gradient(b, a)
|
|
|
|
array([[12., 75.]])
|
|
|
|
"""
|
|
|
|
|
|
|
|
instance = None
|
|
|
|
|
|
|
|
def __new__(cls) -> Self:
|
|
|
|
"""
|
|
|
|
Executes at the creation of class object and returns if
|
|
|
|
object is already created. This class follows singleton
|
|
|
|
design pattern.
|
|
|
|
"""
|
|
|
|
if cls.instance is None:
|
|
|
|
cls.instance = super().__new__(cls)
|
|
|
|
return cls.instance
|
|
|
|
|
|
|
|
def __init__(self) -> None:
|
|
|
|
self.enabled = False
|
|
|
|
|
|
|
|
def __enter__(self) -> Self:
|
|
|
|
self.enabled = True
|
|
|
|
return self
|
|
|
|
|
|
|
|
def __exit__(
|
|
|
|
self,
|
|
|
|
exc_type: type[BaseException] | None,
|
|
|
|
exc: BaseException | None,
|
|
|
|
traceback: TracebackType | None,
|
|
|
|
) -> None:
|
|
|
|
self.enabled = False
|
|
|
|
|
|
|
|
def append(
|
|
|
|
self,
|
|
|
|
op_type: OpType,
|
|
|
|
params: list[Variable],
|
|
|
|
output: Variable,
|
|
|
|
other_params: dict | None = None,
|
|
|
|
) -> None:
|
|
|
|
"""
|
|
|
|
Adds Operation object to the related Variable objects for
|
|
|
|
creating computational graph for calculating gradients.
|
|
|
|
|
|
|
|
Args:
|
|
|
|
op_type: Operation type
|
|
|
|
params: Input parameters to the operation
|
|
|
|
output: Output variable of the operation
|
|
|
|
"""
|
|
|
|
operation = Operation(op_type, other_params=other_params)
|
|
|
|
param_nodes = []
|
|
|
|
for param in params:
|
|
|
|
param.add_param_to(operation)
|
|
|
|
param_nodes.append(param)
|
|
|
|
output.add_result_of(operation)
|
|
|
|
|
|
|
|
operation.add_params(param_nodes)
|
|
|
|
operation.add_output(output)
|
|
|
|
|
|
|
|
def gradient(self, target: Variable, source: Variable) -> np.ndarray | None:
|
|
|
|
"""
|
|
|
|
Reverse accumulation of partial derivatives to calculate gradients
|
|
|
|
of target variable with respect to source variable.
|
|
|
|
|
|
|
|
Args:
|
|
|
|
target: target variable for which gradients are calculated.
|
|
|
|
source: source variable with respect to which the gradients are
|
|
|
|
calculated.
|
|
|
|
|
|
|
|
Returns:
|
|
|
|
Gradient of the source variable with respect to the target variable
|
|
|
|
"""
|
|
|
|
|
|
|
|
# partial derivatives with respect to target
|
|
|
|
partial_deriv = defaultdict(lambda: 0)
|
|
|
|
partial_deriv[target] = np.ones_like(target.to_ndarray())
|
|
|
|
|
|
|
|
# iterating through each operations in the computation graph
|
|
|
|
operation_queue = [target.result_of]
|
|
|
|
while len(operation_queue) > 0:
|
|
|
|
operation = operation_queue.pop()
|
|
|
|
for param in operation.params:
|
|
|
|
# as per the chain rule, multiplying partial derivatives
|
|
|
|
# of variables with respect to the target
|
|
|
|
dparam_doutput = self.derivative(param, operation)
|
|
|
|
dparam_dtarget = dparam_doutput * partial_deriv[operation.output]
|
|
|
|
partial_deriv[param] += dparam_dtarget
|
|
|
|
|
|
|
|
if param.result_of and param.result_of != OpType.NOOP:
|
|
|
|
operation_queue.append(param.result_of)
|
|
|
|
|
|
|
|
return partial_deriv.get(source)
|
|
|
|
|
|
|
|
def derivative(self, param: Variable, operation: Operation) -> np.ndarray:
|
|
|
|
"""
|
|
|
|
Compute the derivative of given operation/function
|
|
|
|
|
|
|
|
Args:
|
|
|
|
param: variable to be differentiated
|
|
|
|
operation: function performed on the input variable
|
|
|
|
|
|
|
|
Returns:
|
|
|
|
Derivative of input variable with respect to the output of
|
|
|
|
the operation
|
|
|
|
"""
|
|
|
|
params = operation.params
|
|
|
|
|
|
|
|
if operation == OpType.ADD:
|
|
|
|
return np.ones_like(params[0].to_ndarray(), dtype=np.float64)
|
|
|
|
if operation == OpType.SUB:
|
|
|
|
if params[0] == param:
|
|
|
|
return np.ones_like(params[0].to_ndarray(), dtype=np.float64)
|
|
|
|
return -np.ones_like(params[1].to_ndarray(), dtype=np.float64)
|
|
|
|
if operation == OpType.MUL:
|
|
|
|
return (
|
|
|
|
params[1].to_ndarray().T
|
|
|
|
if params[0] == param
|
|
|
|
else params[0].to_ndarray().T
|
|
|
|
)
|
|
|
|
if operation == OpType.DIV:
|
|
|
|
if params[0] == param:
|
|
|
|
return 1 / params[1].to_ndarray()
|
|
|
|
return -params[0].to_ndarray() / (params[1].to_ndarray() ** 2)
|
|
|
|
if operation == OpType.MATMUL:
|
|
|
|
return (
|
|
|
|
params[1].to_ndarray().T
|
|
|
|
if params[0] == param
|
|
|
|
else params[0].to_ndarray().T
|
|
|
|
)
|
|
|
|
if operation == OpType.POWER:
|
|
|
|
power = operation.other_params["power"]
|
|
|
|
return power * (params[0].to_ndarray() ** (power - 1))
|
|
|
|
|
|
|
|
err_msg = f"invalid operation type: {operation.op_type}"
|
|
|
|
raise ValueError(err_msg)
|
|
|
|
|
|
|
|
|
|
|
|
if __name__ == "__main__":
|
|
|
|
import doctest
|
|
|
|
|
|
|
|
doctest.testmod()
|