mirror of
https://github.com/TheAlgorithms/Python.git
synced 2025-01-07 02:47:01 +00:00
bc8df6de31
* [pre-commit.ci] pre-commit autoupdate updates: - [github.com/astral-sh/ruff-pre-commit: v0.2.2 → v0.3.2](https://github.com/astral-sh/ruff-pre-commit/compare/v0.2.2...v0.3.2) - [github.com/pre-commit/mirrors-mypy: v1.8.0 → v1.9.0](https://github.com/pre-commit/mirrors-mypy/compare/v1.8.0...v1.9.0) * [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci --------- Co-authored-by: pre-commit-ci[bot] <66853113+pre-commit-ci[bot]@users.noreply.github.com>
132 lines
4.2 KiB
Python
132 lines
4.2 KiB
Python
"""
|
|
The Horn-Schunck method estimates the optical flow for every single pixel of
|
|
a sequence of images.
|
|
It works by assuming brightness constancy between two consecutive frames
|
|
and smoothness in the optical flow.
|
|
|
|
Useful resources:
|
|
Wikipedia: https://en.wikipedia.org/wiki/Horn%E2%80%93Schunck_method
|
|
Paper: http://image.diku.dk/imagecanon/material/HornSchunckOptical_Flow.pdf
|
|
"""
|
|
|
|
from typing import SupportsIndex
|
|
|
|
import numpy as np
|
|
from scipy.ndimage import convolve
|
|
|
|
|
|
def warp(
|
|
image: np.ndarray, horizontal_flow: np.ndarray, vertical_flow: np.ndarray
|
|
) -> np.ndarray:
|
|
"""
|
|
Warps the pixels of an image into a new image using the horizontal and vertical
|
|
flows.
|
|
Pixels that are warped from an invalid location are set to 0.
|
|
|
|
Parameters:
|
|
image: Grayscale image
|
|
horizontal_flow: Horizontal flow
|
|
vertical_flow: Vertical flow
|
|
|
|
Returns: Warped image
|
|
|
|
>>> warp(np.array([[0, 1, 2], [0, 3, 0], [2, 2, 2]]), \
|
|
np.array([[0, 1, -1], [-1, 0, 0], [1, 1, 1]]), \
|
|
np.array([[0, 0, 0], [0, 1, 0], [0, 0, 1]]))
|
|
array([[0, 0, 0],
|
|
[3, 1, 0],
|
|
[0, 2, 3]])
|
|
"""
|
|
flow = np.stack((horizontal_flow, vertical_flow), 2)
|
|
|
|
# Create a grid of all pixel coordinates and subtract the flow to get the
|
|
# target pixels coordinates
|
|
grid = np.stack(
|
|
np.meshgrid(np.arange(0, image.shape[1]), np.arange(0, image.shape[0])), 2
|
|
)
|
|
grid = np.round(grid - flow).astype(np.int32)
|
|
|
|
# Find the locations outside of the original image
|
|
invalid = (grid < 0) | (grid >= np.array([image.shape[1], image.shape[0]]))
|
|
grid[invalid] = 0
|
|
|
|
warped = image[grid[:, :, 1], grid[:, :, 0]]
|
|
|
|
# Set pixels at invalid locations to 0
|
|
warped[invalid[:, :, 0] | invalid[:, :, 1]] = 0
|
|
|
|
return warped
|
|
|
|
|
|
def horn_schunck(
|
|
image0: np.ndarray,
|
|
image1: np.ndarray,
|
|
num_iter: SupportsIndex,
|
|
alpha: float | None = None,
|
|
) -> tuple[np.ndarray, np.ndarray]:
|
|
"""
|
|
This function performs the Horn-Schunck algorithm and returns the estimated
|
|
optical flow. It is assumed that the input images are grayscale and
|
|
normalized to be in [0, 1].
|
|
|
|
Parameters:
|
|
image0: First image of the sequence
|
|
image1: Second image of the sequence
|
|
alpha: Regularization constant
|
|
num_iter: Number of iterations performed
|
|
|
|
Returns: estimated horizontal & vertical flow
|
|
|
|
>>> np.round(horn_schunck(np.array([[0, 0, 2], [0, 0, 2]]), \
|
|
np.array([[0, 2, 0], [0, 2, 0]]), alpha=0.1, num_iter=110)).\
|
|
astype(np.int32)
|
|
array([[[ 0, -1, -1],
|
|
[ 0, -1, -1]],
|
|
<BLANKLINE>
|
|
[[ 0, 0, 0],
|
|
[ 0, 0, 0]]], dtype=int32)
|
|
"""
|
|
if alpha is None:
|
|
alpha = 0.1
|
|
|
|
# Initialize flow
|
|
horizontal_flow = np.zeros_like(image0)
|
|
vertical_flow = np.zeros_like(image0)
|
|
|
|
# Prepare kernels for the calculation of the derivatives and the average velocity
|
|
kernel_x = np.array([[-1, 1], [-1, 1]]) * 0.25
|
|
kernel_y = np.array([[-1, -1], [1, 1]]) * 0.25
|
|
kernel_t = np.array([[1, 1], [1, 1]]) * 0.25
|
|
kernel_laplacian = np.array(
|
|
[[1 / 12, 1 / 6, 1 / 12], [1 / 6, 0, 1 / 6], [1 / 12, 1 / 6, 1 / 12]]
|
|
)
|
|
|
|
# Iteratively refine the flow
|
|
for _ in range(num_iter):
|
|
warped_image = warp(image0, horizontal_flow, vertical_flow)
|
|
derivative_x = convolve(warped_image, kernel_x) + convolve(image1, kernel_x)
|
|
derivative_y = convolve(warped_image, kernel_y) + convolve(image1, kernel_y)
|
|
derivative_t = convolve(warped_image, kernel_t) + convolve(image1, -kernel_t)
|
|
|
|
avg_horizontal_velocity = convolve(horizontal_flow, kernel_laplacian)
|
|
avg_vertical_velocity = convolve(vertical_flow, kernel_laplacian)
|
|
|
|
# This updates the flow as proposed in the paper (Step 12)
|
|
update = (
|
|
derivative_x * avg_horizontal_velocity
|
|
+ derivative_y * avg_vertical_velocity
|
|
+ derivative_t
|
|
)
|
|
update = update / (alpha**2 + derivative_x**2 + derivative_y**2)
|
|
|
|
horizontal_flow = avg_horizontal_velocity - derivative_x * update
|
|
vertical_flow = avg_vertical_velocity - derivative_y * update
|
|
|
|
return horizontal_flow, vertical_flow
|
|
|
|
|
|
if __name__ == "__main__":
|
|
import doctest
|
|
|
|
doctest.testmod()
|