Python/graphs/dijkstra_algorithm.py
pre-commit-ci[bot] 82aa909db7
[pre-commit.ci] pre-commit autoupdate (#11417)
* [pre-commit.ci] pre-commit autoupdate

updates:
- [github.com/astral-sh/ruff-pre-commit: v0.4.4 → v0.4.5](https://github.com/astral-sh/ruff-pre-commit/compare/v0.4.4...v0.4.5)
- [github.com/codespell-project/codespell: v2.2.6 → v2.3.0](https://github.com/codespell-project/codespell/compare/v2.2.6...v2.3.0)
- [github.com/tox-dev/pyproject-fmt: 2.1.1 → 2.1.3](https://github.com/tox-dev/pyproject-fmt/compare/2.1.1...2.1.3)

* [pre-commit.ci] auto fixes from pre-commit.com hooks

for more information, see https://pre-commit.ci

* iterable

* at most

---------

Co-authored-by: pre-commit-ci[bot] <66853113+pre-commit-ci[bot]@users.noreply.github.com>
Co-authored-by: Christian Clauss <cclauss@me.com>
2024-05-27 21:56:48 -04:00

497 lines
15 KiB
Python

# Title: Dijkstra's Algorithm for finding single source shortest path from scratch
# Author: Shubham Malik
# References: https://en.wikipedia.org/wiki/Dijkstra%27s_algorithm
import math
import sys
# For storing the vertex set to retrieve node with the lowest distance
class PriorityQueue:
# Based on Min Heap
def __init__(self):
"""
Priority queue class constructor method.
Examples:
>>> priority_queue_test = PriorityQueue()
>>> priority_queue_test.cur_size
0
>>> priority_queue_test.array
[]
>>> priority_queue_test.pos
{}
"""
self.cur_size = 0
self.array = []
self.pos = {} # To store the pos of node in array
def is_empty(self):
"""
Conditional boolean method to determine if the priority queue is empty or not.
Examples:
>>> priority_queue_test = PriorityQueue()
>>> priority_queue_test.is_empty()
True
>>> priority_queue_test.insert((2, 'A'))
>>> priority_queue_test.is_empty()
False
"""
return self.cur_size == 0
def min_heapify(self, idx):
"""
Sorts the queue array so that the minimum element is root.
Examples:
>>> priority_queue_test = PriorityQueue()
>>> priority_queue_test.cur_size = 3
>>> priority_queue_test.pos = {'A': 0, 'B': 1, 'C': 2}
>>> priority_queue_test.array = [(5, 'A'), (10, 'B'), (15, 'C')]
>>> priority_queue_test.min_heapify(0)
Traceback (most recent call last):
...
TypeError: 'list' object is not callable
>>> priority_queue_test.array
[(5, 'A'), (10, 'B'), (15, 'C')]
>>> priority_queue_test.array = [(10, 'A'), (5, 'B'), (15, 'C')]
>>> priority_queue_test.min_heapify(0)
Traceback (most recent call last):
...
TypeError: 'list' object is not callable
>>> priority_queue_test.array
[(10, 'A'), (5, 'B'), (15, 'C')]
>>> priority_queue_test.array = [(10, 'A'), (15, 'B'), (5, 'C')]
>>> priority_queue_test.min_heapify(0)
Traceback (most recent call last):
...
TypeError: 'list' object is not callable
>>> priority_queue_test.array
[(10, 'A'), (15, 'B'), (5, 'C')]
>>> priority_queue_test.array = [(10, 'A'), (5, 'B')]
>>> priority_queue_test.cur_size = len(priority_queue_test.array)
>>> priority_queue_test.pos = {'A': 0, 'B': 1}
>>> priority_queue_test.min_heapify(0)
Traceback (most recent call last):
...
TypeError: 'list' object is not callable
>>> priority_queue_test.array
[(10, 'A'), (5, 'B')]
"""
lc = self.left(idx)
rc = self.right(idx)
if lc < self.cur_size and self.array(lc)[0] < self.array[idx][0]:
smallest = lc
else:
smallest = idx
if rc < self.cur_size and self.array(rc)[0] < self.array[smallest][0]:
smallest = rc
if smallest != idx:
self.swap(idx, smallest)
self.min_heapify(smallest)
def insert(self, tup):
"""
Inserts a node into the Priority Queue.
Examples:
>>> priority_queue_test = PriorityQueue()
>>> priority_queue_test.insert((10, 'A'))
>>> priority_queue_test.array
[(10, 'A')]
>>> priority_queue_test.insert((15, 'B'))
>>> priority_queue_test.array
[(10, 'A'), (15, 'B')]
>>> priority_queue_test.insert((5, 'C'))
>>> priority_queue_test.array
[(5, 'C'), (10, 'A'), (15, 'B')]
"""
self.pos[tup[1]] = self.cur_size
self.cur_size += 1
self.array.append((sys.maxsize, tup[1]))
self.decrease_key((sys.maxsize, tup[1]), tup[0])
def extract_min(self):
"""
Removes and returns the min element at top of priority queue.
Examples:
>>> priority_queue_test = PriorityQueue()
>>> priority_queue_test.array = [(10, 'A'), (15, 'B')]
>>> priority_queue_test.cur_size = len(priority_queue_test.array)
>>> priority_queue_test.pos = {'A': 0, 'B': 1}
>>> priority_queue_test.insert((5, 'C'))
>>> priority_queue_test.extract_min()
'C'
>>> priority_queue_test.array[0]
(15, 'B')
"""
min_node = self.array[0][1]
self.array[0] = self.array[self.cur_size - 1]
self.cur_size -= 1
self.min_heapify(1)
del self.pos[min_node]
return min_node
def left(self, i):
"""
Returns the index of left child
Examples:
>>> priority_queue_test = PriorityQueue()
>>> priority_queue_test.left(0)
1
>>> priority_queue_test.left(1)
3
"""
return 2 * i + 1
def right(self, i):
"""
Returns the index of right child
Examples:
>>> priority_queue_test = PriorityQueue()
>>> priority_queue_test.right(0)
2
>>> priority_queue_test.right(1)
4
"""
return 2 * i + 2
def par(self, i):
"""
Returns the index of parent
Examples:
>>> priority_queue_test = PriorityQueue()
>>> priority_queue_test.par(1)
0
>>> priority_queue_test.par(2)
1
>>> priority_queue_test.par(4)
2
"""
return math.floor(i / 2)
def swap(self, i, j):
"""
Swaps array elements at indices i and j, update the pos{}
Examples:
>>> priority_queue_test = PriorityQueue()
>>> priority_queue_test.array = [(10, 'A'), (15, 'B')]
>>> priority_queue_test.cur_size = len(priority_queue_test.array)
>>> priority_queue_test.pos = {'A': 0, 'B': 1}
>>> priority_queue_test.swap(0, 1)
>>> priority_queue_test.array
[(15, 'B'), (10, 'A')]
>>> priority_queue_test.pos
{'A': 1, 'B': 0}
"""
self.pos[self.array[i][1]] = j
self.pos[self.array[j][1]] = i
temp = self.array[i]
self.array[i] = self.array[j]
self.array[j] = temp
def decrease_key(self, tup, new_d):
"""
Decrease the key value for a given tuple, assuming the new_d is at most old_d.
Examples:
>>> priority_queue_test = PriorityQueue()
>>> priority_queue_test.array = [(10, 'A'), (15, 'B')]
>>> priority_queue_test.cur_size = len(priority_queue_test.array)
>>> priority_queue_test.pos = {'A': 0, 'B': 1}
>>> priority_queue_test.decrease_key((10, 'A'), 5)
>>> priority_queue_test.array
[(5, 'A'), (15, 'B')]
"""
idx = self.pos[tup[1]]
# assuming the new_d is at most old_d
self.array[idx] = (new_d, tup[1])
while idx > 0 and self.array[self.par(idx)][0] > self.array[idx][0]:
self.swap(idx, self.par(idx))
idx = self.par(idx)
class Graph:
def __init__(self, num):
"""
Graph class constructor
Examples:
>>> graph_test = Graph(1)
>>> graph_test.num_nodes
1
>>> graph_test.dist
[0]
>>> graph_test.par
[-1]
>>> graph_test.adjList
{}
"""
self.adjList = {} # To store graph: u -> (v,w)
self.num_nodes = num # Number of nodes in graph
# To store the distance from source vertex
self.dist = [0] * self.num_nodes
self.par = [-1] * self.num_nodes # To store the path
def add_edge(self, u, v, w):
"""
Add edge going from node u to v and v to u with weight w: u (w)-> v, v (w) -> u
Examples:
>>> graph_test = Graph(1)
>>> graph_test.add_edge(1, 2, 1)
>>> graph_test.add_edge(2, 3, 2)
>>> graph_test.adjList
{1: [(2, 1)], 2: [(1, 1), (3, 2)], 3: [(2, 2)]}
"""
# Check if u already in graph
if u in self.adjList:
self.adjList[u].append((v, w))
else:
self.adjList[u] = [(v, w)]
# Assuming undirected graph
if v in self.adjList:
self.adjList[v].append((u, w))
else:
self.adjList[v] = [(u, w)]
def show_graph(self):
"""
Show the graph: u -> v(w)
Examples:
>>> graph_test = Graph(1)
>>> graph_test.add_edge(1, 2, 1)
>>> graph_test.show_graph()
1 -> 2(1)
2 -> 1(1)
>>> graph_test.add_edge(2, 3, 2)
>>> graph_test.show_graph()
1 -> 2(1)
2 -> 1(1) -> 3(2)
3 -> 2(2)
"""
for u in self.adjList:
print(u, "->", " -> ".join(str(f"{v}({w})") for v, w in self.adjList[u]))
def dijkstra(self, src):
"""
Dijkstra algorithm
Examples:
>>> graph_test = Graph(3)
>>> graph_test.add_edge(0, 1, 2)
>>> graph_test.add_edge(1, 2, 2)
>>> graph_test.dijkstra(0)
Distance from node: 0
Node 0 has distance: 0
Node 1 has distance: 2
Node 2 has distance: 4
>>> graph_test.dist
[0, 2, 4]
>>> graph_test = Graph(2)
>>> graph_test.add_edge(0, 1, 2)
>>> graph_test.dijkstra(0)
Distance from node: 0
Node 0 has distance: 0
Node 1 has distance: 2
>>> graph_test.dist
[0, 2]
>>> graph_test = Graph(3)
>>> graph_test.add_edge(0, 1, 2)
>>> graph_test.dijkstra(0)
Distance from node: 0
Node 0 has distance: 0
Node 1 has distance: 2
Node 2 has distance: 0
>>> graph_test.dist
[0, 2, 0]
>>> graph_test = Graph(3)
>>> graph_test.add_edge(0, 1, 2)
>>> graph_test.add_edge(1, 2, 2)
>>> graph_test.add_edge(0, 2, 1)
>>> graph_test.dijkstra(0)
Distance from node: 0
Node 0 has distance: 0
Node 1 has distance: 2
Node 2 has distance: 1
>>> graph_test.dist
[0, 2, 1]
>>> graph_test = Graph(4)
>>> graph_test.add_edge(0, 1, 4)
>>> graph_test.add_edge(1, 2, 2)
>>> graph_test.add_edge(2, 3, 1)
>>> graph_test.add_edge(0, 2, 3)
>>> graph_test.dijkstra(0)
Distance from node: 0
Node 0 has distance: 0
Node 1 has distance: 4
Node 2 has distance: 3
Node 3 has distance: 4
>>> graph_test.dist
[0, 4, 3, 4]
>>> graph_test = Graph(4)
>>> graph_test.add_edge(0, 1, 4)
>>> graph_test.add_edge(1, 2, 2)
>>> graph_test.add_edge(2, 3, 1)
>>> graph_test.add_edge(0, 2, 7)
>>> graph_test.dijkstra(0)
Distance from node: 0
Node 0 has distance: 0
Node 1 has distance: 4
Node 2 has distance: 6
Node 3 has distance: 7
>>> graph_test.dist
[0, 4, 6, 7]
"""
# Flush old junk values in par[]
self.par = [-1] * self.num_nodes
# src is the source node
self.dist[src] = 0
q = PriorityQueue()
q.insert((0, src)) # (dist from src, node)
for u in self.adjList:
if u != src:
self.dist[u] = sys.maxsize # Infinity
self.par[u] = -1
while not q.is_empty():
u = q.extract_min() # Returns node with the min dist from source
# Update the distance of all the neighbours of u and
# if their prev dist was INFINITY then push them in Q
for v, w in self.adjList[u]:
new_dist = self.dist[u] + w
if self.dist[v] > new_dist:
if self.dist[v] == sys.maxsize:
q.insert((new_dist, v))
else:
q.decrease_key((self.dist[v], v), new_dist)
self.dist[v] = new_dist
self.par[v] = u
# Show the shortest distances from src
self.show_distances(src)
def show_distances(self, src):
"""
Show the distances from src to all other nodes in a graph
Examples:
>>> graph_test = Graph(1)
>>> graph_test.show_distances(0)
Distance from node: 0
Node 0 has distance: 0
"""
print(f"Distance from node: {src}")
for u in range(self.num_nodes):
print(f"Node {u} has distance: {self.dist[u]}")
def show_path(self, src, dest):
"""
Shows the shortest path from src to dest.
WARNING: Use it *after* calling dijkstra.
Examples:
>>> graph_test = Graph(4)
>>> graph_test.add_edge(0, 1, 1)
>>> graph_test.add_edge(1, 2, 2)
>>> graph_test.add_edge(2, 3, 3)
>>> graph_test.dijkstra(0)
Distance from node: 0
Node 0 has distance: 0
Node 1 has distance: 1
Node 2 has distance: 3
Node 3 has distance: 6
>>> graph_test.show_path(0, 3) # doctest: +NORMALIZE_WHITESPACE
----Path to reach 3 from 0----
0 -> 1 -> 2 -> 3
Total cost of path: 6
"""
path = []
cost = 0
temp = dest
# Backtracking from dest to src
while self.par[temp] != -1:
path.append(temp)
if temp != src:
for v, w in self.adjList[temp]:
if v == self.par[temp]:
cost += w
break
temp = self.par[temp]
path.append(src)
path.reverse()
print(f"----Path to reach {dest} from {src}----")
for u in path:
print(f"{u}", end=" ")
if u != dest:
print("-> ", end="")
print("\nTotal cost of path: ", cost)
if __name__ == "__main__":
from doctest import testmod
testmod()
graph = Graph(9)
graph.add_edge(0, 1, 4)
graph.add_edge(0, 7, 8)
graph.add_edge(1, 2, 8)
graph.add_edge(1, 7, 11)
graph.add_edge(2, 3, 7)
graph.add_edge(2, 8, 2)
graph.add_edge(2, 5, 4)
graph.add_edge(3, 4, 9)
graph.add_edge(3, 5, 14)
graph.add_edge(4, 5, 10)
graph.add_edge(5, 6, 2)
graph.add_edge(6, 7, 1)
graph.add_edge(6, 8, 6)
graph.add_edge(7, 8, 7)
graph.show_graph()
graph.dijkstra(0)
graph.show_path(0, 4)
# OUTPUT
# 0 -> 1(4) -> 7(8)
# 1 -> 0(4) -> 2(8) -> 7(11)
# 7 -> 0(8) -> 1(11) -> 6(1) -> 8(7)
# 2 -> 1(8) -> 3(7) -> 8(2) -> 5(4)
# 3 -> 2(7) -> 4(9) -> 5(14)
# 8 -> 2(2) -> 6(6) -> 7(7)
# 5 -> 2(4) -> 3(14) -> 4(10) -> 6(2)
# 4 -> 3(9) -> 5(10)
# 6 -> 5(2) -> 7(1) -> 8(6)
# Distance from node: 0
# Node 0 has distance: 0
# Node 1 has distance: 4
# Node 2 has distance: 12
# Node 3 has distance: 19
# Node 4 has distance: 21
# Node 5 has distance: 11
# Node 6 has distance: 9
# Node 7 has distance: 8
# Node 8 has distance: 14
# ----Path to reach 4 from 0----
# 0 -> 7 -> 6 -> 5 -> 4
# Total cost of path: 21