Python/graphs/bi_directional_dijkstra.py
Tianyi Zheng ae0fc85401
Fix ruff errors (#8936)
* Fix ruff errors

Renamed neural_network/input_data.py to neural_network/input_data.py_tf
because it should be left out of the directory for the following
reasons:

1. Its sole purpose is to be used by neural_network/gan.py_tf, which is
   itself left out of the directory because of issues with TensorFlow.

2. It was taken directly from TensorFlow's codebase and is actually
   already deprecated. If/when neural_network/gan.py_tf is eventually
   re-added back to the directory, its implementation should be changed
   to not use neural_network/input_data.py anyway.

* updating DIRECTORY.md

* Change input_data.py_tf file extension

Change input_data.py_tf file extension because algorithms-keeper bot is being picky about it

---------

Co-authored-by: github-actions <${GITHUB_ACTOR}@users.noreply.github.com>
2023-08-09 13:25:30 +05:30

140 lines
3.3 KiB
Python

"""
Bi-directional Dijkstra's algorithm.
A bi-directional approach is an efficient and
less time consuming optimization for Dijkstra's
searching algorithm
Reference: shorturl.at/exHM7
"""
# Author: Swayam Singh (https://github.com/practice404)
from queue import PriorityQueue
from typing import Any
import numpy as np
def pass_and_relaxation(
graph: dict,
v: str,
visited_forward: set,
visited_backward: set,
cst_fwd: dict,
cst_bwd: dict,
queue: PriorityQueue,
parent: dict,
shortest_distance: float,
) -> float:
for nxt, d in graph[v]:
if nxt in visited_forward:
continue
old_cost_f = cst_fwd.get(nxt, np.inf)
new_cost_f = cst_fwd[v] + d
if new_cost_f < old_cost_f:
queue.put((new_cost_f, nxt))
cst_fwd[nxt] = new_cost_f
parent[nxt] = v
if nxt in visited_backward:
if cst_fwd[v] + d + cst_bwd[nxt] < shortest_distance:
shortest_distance = cst_fwd[v] + d + cst_bwd[nxt]
return shortest_distance
def bidirectional_dij(
source: str, destination: str, graph_forward: dict, graph_backward: dict
) -> int:
"""
Bi-directional Dijkstra's algorithm.
Returns:
shortest_path_distance (int): length of the shortest path.
Warnings:
If the destination is not reachable, function returns -1
>>> bidirectional_dij("E", "F", graph_fwd, graph_bwd)
3
"""
shortest_path_distance = -1
visited_forward = set()
visited_backward = set()
cst_fwd = {source: 0}
cst_bwd = {destination: 0}
parent_forward = {source: None}
parent_backward = {destination: None}
queue_forward: PriorityQueue[Any] = PriorityQueue()
queue_backward: PriorityQueue[Any] = PriorityQueue()
shortest_distance = np.inf
queue_forward.put((0, source))
queue_backward.put((0, destination))
if source == destination:
return 0
while not queue_forward.empty() and not queue_backward.empty():
_, v_fwd = queue_forward.get()
visited_forward.add(v_fwd)
_, v_bwd = queue_backward.get()
visited_backward.add(v_bwd)
shortest_distance = pass_and_relaxation(
graph_forward,
v_fwd,
visited_forward,
visited_backward,
cst_fwd,
cst_bwd,
queue_forward,
parent_forward,
shortest_distance,
)
shortest_distance = pass_and_relaxation(
graph_backward,
v_bwd,
visited_backward,
visited_forward,
cst_bwd,
cst_fwd,
queue_backward,
parent_backward,
shortest_distance,
)
if cst_fwd[v_fwd] + cst_bwd[v_bwd] >= shortest_distance:
break
if shortest_distance != np.inf:
shortest_path_distance = shortest_distance
return shortest_path_distance
graph_fwd = {
"B": [["C", 1]],
"C": [["D", 1]],
"D": [["F", 1]],
"E": [["B", 1], ["G", 2]],
"F": [],
"G": [["F", 1]],
}
graph_bwd = {
"B": [["E", 1]],
"C": [["B", 1]],
"D": [["C", 1]],
"F": [["D", 1], ["G", 1]],
"E": [[None, np.inf]],
"G": [["E", 2]],
}
if __name__ == "__main__":
import doctest
doctest.testmod()