This commit is contained in:
MaximSmolskiy 2024-06-16 21:31:27 +03:00
commit b2e30e7e21
14 changed files with 469 additions and 199 deletions

View File

@ -16,20 +16,20 @@ repos:
- id: auto-walrus - id: auto-walrus
- repo: https://github.com/astral-sh/ruff-pre-commit - repo: https://github.com/astral-sh/ruff-pre-commit
rev: v0.4.3 rev: v0.4.7
hooks: hooks:
- id: ruff - id: ruff
- id: ruff-format - id: ruff-format
- repo: https://github.com/codespell-project/codespell - repo: https://github.com/codespell-project/codespell
rev: v2.2.6 rev: v2.3.0
hooks: hooks:
- id: codespell - id: codespell
additional_dependencies: additional_dependencies:
- tomli - tomli
- repo: https://github.com/tox-dev/pyproject-fmt - repo: https://github.com/tox-dev/pyproject-fmt
rev: "1.8.0" rev: "2.1.3"
hooks: hooks:
- id: pyproject-fmt - id: pyproject-fmt
@ -42,7 +42,7 @@ repos:
pass_filenames: false pass_filenames: false
- repo: https://github.com/abravalheri/validate-pyproject - repo: https://github.com/abravalheri/validate-pyproject
rev: v0.16 rev: v0.18
hooks: hooks:
- id: validate-pyproject - id: validate-pyproject

View File

@ -661,7 +661,6 @@
* [Manhattan Distance](maths/manhattan_distance.py) * [Manhattan Distance](maths/manhattan_distance.py)
* [Matrix Exponentiation](maths/matrix_exponentiation.py) * [Matrix Exponentiation](maths/matrix_exponentiation.py)
* [Max Sum Sliding Window](maths/max_sum_sliding_window.py) * [Max Sum Sliding Window](maths/max_sum_sliding_window.py)
* [Median Of Two Arrays](maths/median_of_two_arrays.py)
* [Minkowski Distance](maths/minkowski_distance.py) * [Minkowski Distance](maths/minkowski_distance.py)
* [Mobius Function](maths/mobius_function.py) * [Mobius Function](maths/mobius_function.py)
* [Modular Division](maths/modular_division.py) * [Modular Division](maths/modular_division.py)

View File

@ -23,6 +23,42 @@ def create_state_space_tree(
Creates a state space tree to iterate through each branch using DFS. Creates a state space tree to iterate through each branch using DFS.
We know that each state has exactly len(sequence) - index children. We know that each state has exactly len(sequence) - index children.
It terminates when it reaches the end of the given sequence. It terminates when it reaches the end of the given sequence.
:param sequence: The input sequence for which permutations are generated.
:param current_sequence: The current permutation being built.
:param index: The current index in the sequence.
:param index_used: list to track which elements are used in permutation.
Example 1:
>>> sequence = [1, 2, 3]
>>> current_sequence = []
>>> index_used = [False, False, False]
>>> create_state_space_tree(sequence, current_sequence, 0, index_used)
[1, 2, 3]
[1, 3, 2]
[2, 1, 3]
[2, 3, 1]
[3, 1, 2]
[3, 2, 1]
Example 2:
>>> sequence = ["A", "B", "C"]
>>> current_sequence = []
>>> index_used = [False, False, False]
>>> create_state_space_tree(sequence, current_sequence, 0, index_used)
['A', 'B', 'C']
['A', 'C', 'B']
['B', 'A', 'C']
['B', 'C', 'A']
['C', 'A', 'B']
['C', 'B', 'A']
Example 3:
>>> sequence = [1]
>>> current_sequence = []
>>> index_used = [False]
>>> create_state_space_tree(sequence, current_sequence, 0, index_used)
[1]
""" """
if index == len(sequence): if index == len(sequence):

View File

@ -22,6 +22,56 @@ def create_state_space_tree(
Creates a state space tree to iterate through each branch using DFS. Creates a state space tree to iterate through each branch using DFS.
We know that each state has exactly two children. We know that each state has exactly two children.
It terminates when it reaches the end of the given sequence. It terminates when it reaches the end of the given sequence.
:param sequence: The input sequence for which subsequences are generated.
:param current_subsequence: The current subsequence being built.
:param index: The current index in the sequence.
Example:
>>> sequence = [3, 2, 1]
>>> current_subsequence = []
>>> create_state_space_tree(sequence, current_subsequence, 0)
[]
[1]
[2]
[2, 1]
[3]
[3, 1]
[3, 2]
[3, 2, 1]
>>> sequence = ["A", "B"]
>>> current_subsequence = []
>>> create_state_space_tree(sequence, current_subsequence, 0)
[]
['B']
['A']
['A', 'B']
>>> sequence = []
>>> current_subsequence = []
>>> create_state_space_tree(sequence, current_subsequence, 0)
[]
>>> sequence = [1, 2, 3, 4]
>>> current_subsequence = []
>>> create_state_space_tree(sequence, current_subsequence, 0)
[]
[4]
[3]
[3, 4]
[2]
[2, 4]
[2, 3]
[2, 3, 4]
[1]
[1, 4]
[1, 3]
[1, 3, 4]
[1, 2]
[1, 2, 4]
[1, 2, 3]
[1, 2, 3, 4]
""" """
if index == len(sequence): if index == len(sequence):
@ -35,7 +85,7 @@ def create_state_space_tree(
if __name__ == "__main__": if __name__ == "__main__":
seq: list[Any] = [3, 1, 2, 4] seq: list[Any] = [1, 2, 3]
generate_all_subsequences(seq) generate_all_subsequences(seq)
seq.clear() seq.clear()

View File

@ -26,7 +26,7 @@ def binary_and(a: int, b: int) -> str:
>>> binary_and(0, 1.1) >>> binary_and(0, 1.1)
Traceback (most recent call last): Traceback (most recent call last):
... ...
TypeError: 'float' object cannot be interpreted as an integer ValueError: Unknown format code 'b' for object of type 'float'
>>> binary_and("0", "1") >>> binary_and("0", "1")
Traceback (most recent call last): Traceback (most recent call last):
... ...
@ -35,8 +35,8 @@ def binary_and(a: int, b: int) -> str:
if a < 0 or b < 0: if a < 0 or b < 0:
raise ValueError("the value of both inputs must be positive") raise ValueError("the value of both inputs must be positive")
a_binary = str(bin(a))[2:] # remove the leading "0b" a_binary = format(a, "b")
b_binary = str(bin(b))[2:] # remove the leading "0b" b_binary = format(b, "b")
max_len = max(len(a_binary), len(b_binary)) max_len = max(len(a_binary), len(b_binary))

View File

@ -98,7 +98,7 @@ class SegmentTree:
def show_data(self): def show_data(self):
show_list = [] show_list = []
for i in range(1, N + 1): for i in range(1, self.N + 1):
show_list += [self.query(i, i)] show_list += [self.query(i, i)]
print(show_list) print(show_list)

View File

@ -2,6 +2,20 @@ def actual_power(a: int, b: int):
""" """
Function using divide and conquer to calculate a^b. Function using divide and conquer to calculate a^b.
It only works for integer a,b. It only works for integer a,b.
:param a: The base of the power operation, an integer.
:param b: The exponent of the power operation, a non-negative integer.
:return: The result of a^b.
Examples:
>>> actual_power(3, 2)
9
>>> actual_power(5, 3)
125
>>> actual_power(2, 5)
32
>>> actual_power(7, 0)
1
""" """
if b == 0: if b == 0:
return 1 return 1
@ -13,6 +27,10 @@ def actual_power(a: int, b: int):
def power(a: int, b: int) -> float: def power(a: int, b: int) -> float:
""" """
:param a: The base (integer).
:param b: The exponent (integer).
:return: The result of a^b, as a float for negative exponents.
>>> power(4,6) >>> power(4,6)
4096 4096
>>> power(2,3) >>> power(2,3)

View File

@ -215,7 +215,7 @@ class PriorityQueue:
[(5, 'A'), (15, 'B')] [(5, 'A'), (15, 'B')]
""" """
idx = self.pos[tup[1]] idx = self.pos[tup[1]]
# assuming the new_d is atmost old_d # assuming the new_d is at most old_d
self.array[idx] = (new_d, tup[1]) self.array[idx] = (new_d, tup[1])
while idx > 0 and self.array[self.par(idx)][0] > self.array[idx][0]: while idx > 0 and self.array[self.par(idx)][0] > self.array[idx][0]:
self.swap(idx, self.par(idx)) self.swap(idx, self.par(idx))

View File

@ -629,6 +629,40 @@ def smooth_l1_loss(y_true: np.ndarray, y_pred: np.ndarray, beta: float = 1.0) ->
return np.mean(loss) return np.mean(loss)
def kullback_leibler_divergence(y_true: np.ndarray, y_pred: np.ndarray) -> float:
"""
Calculate the Kullback-Leibler divergence (KL divergence) loss between true labels
and predicted probabilities.
KL divergence loss quantifies dissimilarity between true labels and predicted
probabilities. It's often used in training generative models.
KL = Σ(y_true * ln(y_true / y_pred))
Reference: https://en.wikipedia.org/wiki/Kullback%E2%80%93Leibler_divergence
Parameters:
- y_true: True class probabilities
- y_pred: Predicted class probabilities
>>> true_labels = np.array([0.2, 0.3, 0.5])
>>> predicted_probs = np.array([0.3, 0.3, 0.4])
>>> kullback_leibler_divergence(true_labels, predicted_probs)
0.030478754035472025
>>> true_labels = np.array([0.2, 0.3, 0.5])
>>> predicted_probs = np.array([0.3, 0.3, 0.4, 0.5])
>>> kullback_leibler_divergence(true_labels, predicted_probs)
Traceback (most recent call last):
...
ValueError: Input arrays must have the same length.
"""
if len(y_true) != len(y_pred):
raise ValueError("Input arrays must have the same length.")
kl_loss = y_true * np.log(y_true / y_pred)
return np.sum(kl_loss)
if __name__ == "__main__": if __name__ == "__main__":
import doctest import doctest

View File

@ -1,11 +1,9 @@
""" """
Implementation of sequential minimal optimization (SMO) for support vector machines Sequential minimal optimization (SMO) for support vector machines (SVM)
(SVM).
Sequential minimal optimization (SMO) is an algorithm for solving the quadratic Sequential minimal optimization (SMO) is an algorithm for solving the quadratic
programming (QP) problem that arises during the training of support vector programming (QP) problem that arises during the training of SVMs. It was invented by
machines. John Platt in 1998.
It was invented by John Platt in 1998.
Input: Input:
0: type: numpy.ndarray. 0: type: numpy.ndarray.
@ -124,8 +122,7 @@ class SmoSVM:
b_old = self._b b_old = self._b
self._b = b self._b = b
# 4: update error value,here we only calculate those non-bound samples' # 4: update error, here we only calculate the error for non-bound samples
# error
self._unbound = [i for i in self._all_samples if self._is_unbound(i)] self._unbound = [i for i in self._all_samples if self._is_unbound(i)]
for s in self.unbound: for s in self.unbound:
if s in (i1, i2): if s in (i1, i2):
@ -136,7 +133,7 @@ class SmoSVM:
+ (self._b - b_old) + (self._b - b_old)
) )
# if i1 or i2 is non-bound,update there error value to zero # if i1 or i2 is non-bound, update their error value to zero
if self._is_unbound(i1): if self._is_unbound(i1):
self._error[i1] = 0 self._error[i1] = 0
if self._is_unbound(i2): if self._is_unbound(i2):
@ -161,7 +158,7 @@ class SmoSVM:
results.append(result) results.append(result)
return np.array(results) return np.array(results)
# Check if alpha violate KKT condition # Check if alpha violates the KKT condition
def _check_obey_kkt(self, index): def _check_obey_kkt(self, index):
alphas = self.alphas alphas = self.alphas
tol = self._tol tol = self._tol
@ -172,20 +169,19 @@ class SmoSVM:
# Get value calculated from kernel function # Get value calculated from kernel function
def _k(self, i1, i2): def _k(self, i1, i2):
# for test samples,use Kernel function # for test samples, use kernel function
if isinstance(i2, np.ndarray): if isinstance(i2, np.ndarray):
return self.Kernel(self.samples[i1], i2) return self.Kernel(self.samples[i1], i2)
# for train samples,Kernel values have been saved in matrix # for training samples, kernel values have been saved in matrix
else: else:
return self._K_matrix[i1, i2] return self._K_matrix[i1, i2]
# Get sample's error # Get error for sample
def _e(self, index): def _e(self, index):
""" """
Two cases: Two cases:
1:Sample[index] is non-bound,Fetch error from list: _error 1: Sample[index] is non-bound, fetch error from list: _error
2:sample[index] is bound,Use predicted value deduct true value: g(xi) - yi 2: sample[index] is bound, use predicted value minus true value: g(xi) - yi
""" """
# get from error data # get from error data
if self._is_unbound(index): if self._is_unbound(index):
@ -196,7 +192,7 @@ class SmoSVM:
yi = self.tags[index] yi = self.tags[index]
return gx - yi return gx - yi
# Calculate Kernel matrix of all possible i1,i2 ,saving time # Calculate kernel matrix of all possible i1, i2, saving time
def _calculate_k_matrix(self): def _calculate_k_matrix(self):
k_matrix = np.zeros([self.length, self.length]) k_matrix = np.zeros([self.length, self.length])
for i in self._all_samples: for i in self._all_samples:
@ -206,7 +202,7 @@ class SmoSVM:
) )
return k_matrix return k_matrix
# Predict test sample's tag # Predict tag for test sample
def _predict(self, sample): def _predict(self, sample):
k = self._k k = self._k
predicted_value = ( predicted_value = (
@ -222,30 +218,31 @@ class SmoSVM:
# Choose alpha1 and alpha2 # Choose alpha1 and alpha2
def _choose_alphas(self): def _choose_alphas(self):
locis = yield from self._choose_a1() loci = yield from self._choose_a1()
if not locis: if not loci:
return None return None
return locis return loci
def _choose_a1(self): def _choose_a1(self):
""" """
Choose first alpha ;steps: Choose first alpha
1:First loop over all sample Steps:
2:Second loop over all non-bound samples till all non-bound samples does not 1: First loop over all samples
voilate kkt condition. 2: Second loop over all non-bound samples until no non-bound samples violate
3:Repeat this two process endlessly,till all samples does not voilate kkt the KKT condition.
condition samples after first loop. 3: Repeat these two processes until no samples violate the KKT condition
after the first loop.
""" """
while True: while True:
all_not_obey = True all_not_obey = True
# all sample # all sample
print("scanning all sample!") print("Scanning all samples!")
for i1 in [i for i in self._all_samples if self._check_obey_kkt(i)]: for i1 in [i for i in self._all_samples if self._check_obey_kkt(i)]:
all_not_obey = False all_not_obey = False
yield from self._choose_a2(i1) yield from self._choose_a2(i1)
# non-bound sample # non-bound sample
print("scanning non-bound sample!") print("Scanning non-bound samples!")
while True: while True:
not_obey = True not_obey = True
for i1 in [ for i1 in [
@ -256,20 +253,21 @@ class SmoSVM:
not_obey = False not_obey = False
yield from self._choose_a2(i1) yield from self._choose_a2(i1)
if not_obey: if not_obey:
print("all non-bound samples fit the KKT condition!") print("All non-bound samples satisfy the KKT condition!")
break break
if all_not_obey: if all_not_obey:
print("all samples fit the KKT condition! Optimization done!") print("All samples satisfy the KKT condition!")
break break
return False return False
def _choose_a2(self, i1): def _choose_a2(self, i1):
""" """
Choose the second alpha by using heuristic algorithm ;steps: Choose the second alpha using a heuristic algorithm
1: Choose alpha2 which gets the maximum step size (|E1 - E2|). Steps:
2: Start in a random point,loop over all non-bound samples till alpha1 and 1: Choose alpha2 that maximizes the step size (|E1 - E2|).
2: Start in a random point, loop over all non-bound samples till alpha1 and
alpha2 are optimized. alpha2 are optimized.
3: Start in a random point,loop over all samples till alpha1 and alpha2 are 3: Start in a random point, loop over all samples till alpha1 and alpha2 are
optimized. optimized.
""" """
self._unbound = [i for i in self._all_samples if self._is_unbound(i)] self._unbound = [i for i in self._all_samples if self._is_unbound(i)]
@ -320,7 +318,7 @@ class SmoSVM:
k22 = k(i2, i2) k22 = k(i2, i2)
k12 = k(i1, i2) k12 = k(i1, i2)
# select the new alpha2 which could get the minimal objectives # select the new alpha2 which could achieve the minimal objectives
if (eta := k11 + k22 - 2.0 * k12) > 0.0: if (eta := k11 + k22 - 2.0 * k12) > 0.0:
a2_new_unc = a2 + (y2 * (e1 - e2)) / eta a2_new_unc = a2 + (y2 * (e1 - e2)) / eta
# a2_new has a boundary # a2_new has a boundary
@ -335,7 +333,7 @@ class SmoSVM:
l1 = a1 + s * (a2 - l) l1 = a1 + s * (a2 - l)
h1 = a1 + s * (a2 - h) h1 = a1 + s * (a2 - h)
# way 1 # Method 1
f1 = y1 * (e1 + b) - a1 * k(i1, i1) - s * a2 * k(i1, i2) f1 = y1 * (e1 + b) - a1 * k(i1, i1) - s * a2 * k(i1, i2)
f2 = y2 * (e2 + b) - a2 * k(i2, i2) - s * a1 * k(i1, i2) f2 = y2 * (e2 + b) - a2 * k(i2, i2) - s * a1 * k(i1, i2)
ol = ( ol = (
@ -353,9 +351,8 @@ class SmoSVM:
+ s * h * h1 * k(i1, i2) + s * h * h1 * k(i1, i2)
) )
""" """
# way 2 Method 2: Use objective function to check which alpha2_new could achieve the
Use objective function check which alpha2 new could get the minimal minimal objectives
objectives
""" """
if ol < (oh - self._eps): if ol < (oh - self._eps):
a2_new = l a2_new = l
@ -375,7 +372,7 @@ class SmoSVM:
return a1_new, a2_new return a1_new, a2_new
# Normalise data using min_max way # Normalize data using min-max method
def _norm(self, data): def _norm(self, data):
if self._init: if self._init:
self._min = np.min(data, axis=0) self._min = np.min(data, axis=0)
@ -424,7 +421,7 @@ class Kernel:
def _check(self): def _check(self):
if self._kernel == self._rbf and self.gamma < 0: if self._kernel == self._rbf and self.gamma < 0:
raise ValueError("gamma value must greater than 0") raise ValueError("gamma value must be non-negative")
def _get_kernel(self, kernel_name): def _get_kernel(self, kernel_name):
maps = {"linear": self._linear, "poly": self._polynomial, "rbf": self._rbf} maps = {"linear": self._linear, "poly": self._polynomial, "rbf": self._rbf}
@ -444,26 +441,30 @@ def count_time(func):
start_time = time.time() start_time = time.time()
func(*args, **kwargs) func(*args, **kwargs)
end_time = time.time() end_time = time.time()
print(f"smo algorithm cost {end_time - start_time} seconds") print(f"SMO algorithm cost {end_time - start_time} seconds")
return call_func return call_func
@count_time @count_time
def test_cancel_data(): def test_cancer_data():
print("Hello!\nStart test svm by smo algorithm!") print("Hello!\nStart test SVM using the SMO algorithm!")
# 0: download dataset and load into pandas' dataframe # 0: download dataset and load into pandas' dataframe
if not os.path.exists(r"cancel_data.csv"): if not os.path.exists(r"cancer_data.csv"):
request = urllib.request.Request( # noqa: S310 request = urllib.request.Request( # noqa: S310
CANCER_DATASET_URL, CANCER_DATASET_URL,
headers={"User-Agent": "Mozilla/4.0 (compatible; MSIE 5.5; Windows NT)"}, headers={"User-Agent": "Mozilla/4.0 (compatible; MSIE 5.5; Windows NT)"},
) )
response = urllib.request.urlopen(request) # noqa: S310 response = urllib.request.urlopen(request) # noqa: S310
content = response.read().decode("utf-8") content = response.read().decode("utf-8")
with open(r"cancel_data.csv", "w") as f: with open(r"cancer_data.csv", "w") as f:
f.write(content) f.write(content)
data = pd.read_csv(r"cancel_data.csv", header=None) data = pd.read_csv(
"cancer_data.csv",
header=None,
dtype={0: str}, # Assuming the first column contains string data
)
# 1: pre-processing data # 1: pre-processing data
del data[data.columns.tolist()[0]] del data[data.columns.tolist()[0]]
@ -475,14 +476,14 @@ def test_cancel_data():
train_data, test_data = samples[:328, :], samples[328:, :] train_data, test_data = samples[:328, :], samples[328:, :]
test_tags, test_samples = test_data[:, 0], test_data[:, 1:] test_tags, test_samples = test_data[:, 0], test_data[:, 1:]
# 3: choose kernel function,and set initial alphas to zero(optional) # 3: choose kernel function, and set initial alphas to zero (optional)
mykernel = Kernel(kernel="rbf", degree=5, coef0=1, gamma=0.5) my_kernel = Kernel(kernel="rbf", degree=5, coef0=1, gamma=0.5)
al = np.zeros(train_data.shape[0]) al = np.zeros(train_data.shape[0])
# 4: calculating best alphas using SMO algorithm and predict test_data samples # 4: calculating best alphas using SMO algorithm and predict test_data samples
mysvm = SmoSVM( mysvm = SmoSVM(
train=train_data, train=train_data,
kernel_func=mykernel, kernel_func=my_kernel,
alpha_list=al, alpha_list=al,
cost=0.4, cost=0.4,
b=0.0, b=0.0,
@ -497,30 +498,30 @@ def test_cancel_data():
for i in range(test_tags.shape[0]): for i in range(test_tags.shape[0]):
if test_tags[i] == predict[i]: if test_tags[i] == predict[i]:
score += 1 score += 1
print(f"\nall: {test_num}\nright: {score}\nfalse: {test_num - score}") print(f"\nAll: {test_num}\nCorrect: {score}\nIncorrect: {test_num - score}")
print(f"Rough Accuracy: {score / test_tags.shape[0]}") print(f"Rough Accuracy: {score / test_tags.shape[0]}")
def test_demonstration(): def test_demonstration():
# change stdout # change stdout
print("\nStart plot,please wait!!!") print("\nStarting plot, please wait!")
sys.stdout = open(os.devnull, "w") sys.stdout = open(os.devnull, "w")
ax1 = plt.subplot2grid((2, 2), (0, 0)) ax1 = plt.subplot2grid((2, 2), (0, 0))
ax2 = plt.subplot2grid((2, 2), (0, 1)) ax2 = plt.subplot2grid((2, 2), (0, 1))
ax3 = plt.subplot2grid((2, 2), (1, 0)) ax3 = plt.subplot2grid((2, 2), (1, 0))
ax4 = plt.subplot2grid((2, 2), (1, 1)) ax4 = plt.subplot2grid((2, 2), (1, 1))
ax1.set_title("linear svm,cost:0.1") ax1.set_title("Linear SVM, cost = 0.1")
test_linear_kernel(ax1, cost=0.1) test_linear_kernel(ax1, cost=0.1)
ax2.set_title("linear svm,cost:500") ax2.set_title("Linear SVM, cost = 500")
test_linear_kernel(ax2, cost=500) test_linear_kernel(ax2, cost=500)
ax3.set_title("rbf kernel svm,cost:0.1") ax3.set_title("RBF kernel SVM, cost = 0.1")
test_rbf_kernel(ax3, cost=0.1) test_rbf_kernel(ax3, cost=0.1)
ax4.set_title("rbf kernel svm,cost:500") ax4.set_title("RBF kernel SVM, cost = 500")
test_rbf_kernel(ax4, cost=500) test_rbf_kernel(ax4, cost=500)
sys.stdout = sys.__stdout__ sys.stdout = sys.__stdout__
print("Plot done!!!") print("Plot done!")
def test_linear_kernel(ax, cost): def test_linear_kernel(ax, cost):
@ -531,10 +532,10 @@ def test_linear_kernel(ax, cost):
scaler = StandardScaler() scaler = StandardScaler()
train_x_scaled = scaler.fit_transform(train_x, train_y) train_x_scaled = scaler.fit_transform(train_x, train_y)
train_data = np.hstack((train_y.reshape(500, 1), train_x_scaled)) train_data = np.hstack((train_y.reshape(500, 1), train_x_scaled))
mykernel = Kernel(kernel="linear", degree=5, coef0=1, gamma=0.5) my_kernel = Kernel(kernel="linear", degree=5, coef0=1, gamma=0.5)
mysvm = SmoSVM( mysvm = SmoSVM(
train=train_data, train=train_data,
kernel_func=mykernel, kernel_func=my_kernel,
cost=cost, cost=cost,
tolerance=0.001, tolerance=0.001,
auto_norm=False, auto_norm=False,
@ -551,10 +552,10 @@ def test_rbf_kernel(ax, cost):
scaler = StandardScaler() scaler = StandardScaler()
train_x_scaled = scaler.fit_transform(train_x, train_y) train_x_scaled = scaler.fit_transform(train_x, train_y)
train_data = np.hstack((train_y.reshape(500, 1), train_x_scaled)) train_data = np.hstack((train_y.reshape(500, 1), train_x_scaled))
mykernel = Kernel(kernel="rbf", degree=5, coef0=1, gamma=0.5) my_kernel = Kernel(kernel="rbf", degree=5, coef0=1, gamma=0.5)
mysvm = SmoSVM( mysvm = SmoSVM(
train=train_data, train=train_data,
kernel_func=mykernel, kernel_func=my_kernel,
cost=cost, cost=cost,
tolerance=0.001, tolerance=0.001,
auto_norm=False, auto_norm=False,
@ -567,11 +568,11 @@ def plot_partition_boundary(
model, train_data, ax, resolution=100, colors=("b", "k", "r") model, train_data, ax, resolution=100, colors=("b", "k", "r")
): ):
""" """
We can not get the optimum w of our kernel svm model which is different from linear We cannot get the optimal w of our kernel SVM model, which is different from a
svm. For this reason, we generate randomly distributed points with high desity and linear SVM. For this reason, we generate randomly distributed points with high
prediced values of these points are calculated by using our trained model. Then we density, and predicted values of these points are calculated using our trained
could use this prediced values to draw contour map. model. Then we could use this predicted values to draw contour map, and this contour
And this contour map can represent svm's partition boundary. map represents the SVM's partition boundary.
""" """
train_data_x = train_data[:, 1] train_data_x = train_data[:, 1]
train_data_y = train_data[:, 2] train_data_y = train_data[:, 2]
@ -616,6 +617,6 @@ def plot_partition_boundary(
if __name__ == "__main__": if __name__ == "__main__":
test_cancel_data() test_cancer_data()
test_demonstration() test_demonstration()
plt.show() plt.show()

View File

@ -1,33 +0,0 @@
from __future__ import annotations
def median_of_two_arrays(nums1: list[float], nums2: list[float]) -> float:
"""
>>> median_of_two_arrays([1, 2], [3])
2
>>> median_of_two_arrays([0, -1.1], [2.5, 1])
0.5
>>> median_of_two_arrays([], [2.5, 1])
1.75
>>> median_of_two_arrays([], [0])
0
>>> median_of_two_arrays([], [])
Traceback (most recent call last):
...
IndexError: list index out of range
"""
all_numbers = sorted(nums1 + nums2)
div, mod = divmod(len(all_numbers), 2)
if mod == 1:
return all_numbers[div]
else:
return (all_numbers[div] + all_numbers[div - 1]) / 2
if __name__ == "__main__":
import doctest
doctest.testmod()
array_1 = [float(x) for x in input("Enter the elements of first array: ").split()]
array_2 = [float(x) for x in input("Enter the elements of second array: ").split()]
print(f"The median of two arrays is: {median_of_two_arrays(array_1, array_2)}")

View File

@ -0,0 +1,143 @@
"""
Rainfall Intensity
==================
This module contains functions to calculate the intensity of
a rainfall event for a given duration and return period.
This function uses the Sherman intensity-duration-frequency curve.
References
----------
- Aparicio, F. (1997): Fundamentos de Hidrología de Superficie.
Balderas, México, Limusa. 303 p.
- https://en.wikipedia.org/wiki/Intensity-duration-frequency_curve
"""
def rainfall_intensity(
coefficient_k: float,
coefficient_a: float,
coefficient_b: float,
coefficient_c: float,
return_period: float,
duration: float,
) -> float:
"""
Calculate the intensity of a rainfall event for a given duration and return period.
It's based on the Sherman intensity-duration-frequency curve:
I = k * T^a / (D + b)^c
where:
I = Intensity of the rainfall event [mm/h]
k, a, b, c = Coefficients obtained through statistical distribution adjust
T = Return period in years
D = Rainfall event duration in minutes
Parameters
----------
coefficient_k : float
Coefficient obtained through statistical distribution adjust.
coefficient_a : float
Coefficient obtained through statistical distribution adjust.
coefficient_b : float
Coefficient obtained through statistical distribution adjust.
coefficient_c : float
Coefficient obtained through statistical distribution adjust.
return_period : float
Return period in years.
duration : float
Rainfall event duration in minutes.
Returns
-------
intensity : float
Intensity of the rainfall event in mm/h.
Raises
------
ValueError
If any of the parameters are not positive.
Examples
--------
>>> rainfall_intensity(1000, 0.2, 11.6, 0.81, 10, 60)
49.83339231138578
>>> rainfall_intensity(1000, 0.2, 11.6, 0.81, 10, 30)
77.36319588106228
>>> rainfall_intensity(1000, 0.2, 11.6, 0.81, 5, 60)
43.382487747633625
>>> rainfall_intensity(0, 0.2, 11.6, 0.81, 10, 60)
Traceback (most recent call last):
...
ValueError: All parameters must be positive.
>>> rainfall_intensity(1000, -0.2, 11.6, 0.81, 10, 60)
Traceback (most recent call last):
...
ValueError: All parameters must be positive.
>>> rainfall_intensity(1000, 0.2, -11.6, 0.81, 10, 60)
Traceback (most recent call last):
...
ValueError: All parameters must be positive.
>>> rainfall_intensity(1000, 0.2, 11.6, -0.81, 10, 60)
Traceback (most recent call last):
...
ValueError: All parameters must be positive.
>>> rainfall_intensity(1000, 0, 11.6, 0.81, 10, 60)
Traceback (most recent call last):
...
ValueError: All parameters must be positive.
>>> rainfall_intensity(1000, 0.2, 0, 0.81, 10, 60)
Traceback (most recent call last):
...
ValueError: All parameters must be positive.
>>> rainfall_intensity(1000, 0.2, 11.6, 0, 10, 60)
Traceback (most recent call last):
...
ValueError: All parameters must be positive.
>>> rainfall_intensity(0, 0.2, 11.6, 0.81, 10, 60)
Traceback (most recent call last):
...
ValueError: All parameters must be positive.
>>> rainfall_intensity(1000, 0.2, 11.6, 0.81, 0, 60)
Traceback (most recent call last):
...
ValueError: All parameters must be positive.
>>> rainfall_intensity(1000, 0.2, 11.6, 0.81, 10, 0)
Traceback (most recent call last):
...
ValueError: All parameters must be positive.
"""
if (
coefficient_k <= 0
or coefficient_a <= 0
or coefficient_b <= 0
or coefficient_c <= 0
or return_period <= 0
or duration <= 0
):
raise ValueError("All parameters must be positive.")
intensity = (coefficient_k * (return_period**coefficient_a)) / (
(duration + coefficient_b) ** coefficient_c
)
return intensity
if __name__ == "__main__":
import doctest
doctest.testmod()

View File

@ -58,7 +58,7 @@ def upf_len(num: int) -> int:
def equality(iterable: list) -> bool: def equality(iterable: list) -> bool:
""" """
Check equality of ALL elements in an interable. Check equality of ALL elements in an iterable
>>> equality([1, 2, 3, 4]) >>> equality([1, 2, 3, 4])
False False
>>> equality([2, 2, 2, 2]) >>> equality([2, 2, 2, 2])

View File

@ -1,21 +1,9 @@
[tool.ruff] [tool.ruff]
lint.ignore = [ # `ruff rule S101` for a description of that rule target-version = "py312"
"B904", # Within an `except` clause, raise exceptions with `raise ... from err` -- FIX ME
"B905", # `zip()` without an explicit `strict=` parameter -- FIX ME output-format = "full"
"EM101", # Exception must not use a string literal, assign to variable first lint.select = [
"EXE001", # Shebang is present but file is not executable -- DO NOT FIX # https://beta.ruff.rs/docs/rules
"G004", # Logging statement uses f-string
"PLC1901", # `{}` can be simplified to `{}` as an empty string is falsey
"PLW060", # Using global for `{name}` but no assignment is done -- DO NOT FIX
"PLW2901", # PLW2901: Redefined loop variable -- FIX ME
"PT011", # `pytest.raises(Exception)` is too broad, set the `match` parameter or use a more specific exception
"PT018", # Assertion should be broken down into multiple parts
"S101", # Use of `assert` detected -- DO NOT FIX
"S311", # Standard pseudo-random generators are not suitable for cryptographic purposes -- FIX ME
"SLF001", # Private member accessed: `_Iterator` -- FIX ME
"UP038", # Use `X | Y` in `{}` call instead of `(X, Y)` -- DO NOT FIX
]
lint.select = [ # https://beta.ruff.rs/docs/rules
"A", # flake8-builtins "A", # flake8-builtins
"ARG", # flake8-unused-arguments "ARG", # flake8-unused-arguments
"ASYNC", # flake8-async "ASYNC", # flake8-async
@ -68,30 +56,64 @@ lint.select = [ # https://beta.ruff.rs/docs/rules
# "TCH", # flake8-type-checking # "TCH", # flake8-type-checking
# "TRY", # tryceratops # "TRY", # tryceratops
] ]
output-format = "full" lint.ignore = [
target-version = "py312" # `ruff rule S101` for a description of that rule
"B904", # Within an `except` clause, raise exceptions with `raise ... from err` -- FIX ME
"B905", # `zip()` without an explicit `strict=` parameter -- FIX ME
"EM101", # Exception must not use a string literal, assign to variable first
"EXE001", # Shebang is present but file is not executable -- DO NOT FIX
"G004", # Logging statement uses f-string
"PLC1901", # `{}` can be simplified to `{}` as an empty string is falsey
"PLW060", # Using global for `{name}` but no assignment is done -- DO NOT FIX
"PLW2901", # PLW2901: Redefined loop variable -- FIX ME
"PT011", # `pytest.raises(Exception)` is too broad, set the `match` parameter or use a more specific exception
"PT018", # Assertion should be broken down into multiple parts
"S101", # Use of `assert` detected -- DO NOT FIX
"S311", # Standard pseudo-random generators are not suitable for cryptographic purposes -- FIX ME
"SLF001", # Private member accessed: `_Iterator` -- FIX ME
"UP038", # Use `X | Y` in `{}` call instead of `(X, Y)` -- DO NOT FIX
]
[tool.ruff.lint.mccabe] # DO NOT INCREASE THIS VALUE lint.per-file-ignores."arithmetic_analysis/newton_raphson.py" = [
max-complexity = 17 # default: 10 "PGH001",
]
[tool.ruff.lint.per-file-ignores] lint.per-file-ignores."data_structures/binary_tree/binary_search_tree_recursive.py" = [
"arithmetic_analysis/newton_raphson.py" = ["PGH001"] "BLE001",
"data_structures/binary_tree/binary_search_tree_recursive.py" = ["BLE001"] ]
"data_structures/hashing/tests/test_hash_map.py" = ["BLE001"] lint.per-file-ignores."data_structures/hashing/tests/test_hash_map.py" = [
"hashes/enigma_machine.py" = ["BLE001"] "BLE001",
"machine_learning/sequential_minimum_optimization.py" = ["SIM115"] ]
"matrix/sherman_morrison.py" = ["SIM103"] lint.per-file-ignores."hashes/enigma_machine.py" = [
"other/l*u_cache.py" = ["RUF012"] "BLE001",
"physics/newtons_second_law_of_motion.py" = ["BLE001"] ]
"project_euler/problem_099/sol1.py" = ["SIM115"] lint.per-file-ignores."machine_learning/sequential_minimum_optimization.py" = [
"sorts/external_sort.py" = ["SIM115"] "SIM115",
]
[tool.ruff.lint.pylint] # DO NOT INCREASE THESE VALUES lint.per-file-ignores."matrix/sherman_morrison.py" = [
allow-magic-value-types = ["float", "int", "str"] "SIM103",
max-args = 10 # default: 5 ]
max-branches = 20 # default: 12 lint.per-file-ignores."other/l*u_cache.py" = [
max-returns = 8 # default: 6 "RUF012",
max-statements = 88 # default: 50 ]
lint.per-file-ignores."physics/newtons_second_law_of_motion.py" = [
"BLE001",
]
lint.per-file-ignores."project_euler/problem_099/sol1.py" = [
"SIM115",
]
lint.per-file-ignores."sorts/external_sort.py" = [
"SIM115",
]
lint.mccabe.max-complexity = 17 # default: 10
lint.pylint.allow-magic-value-types = [
"float",
"int",
"str",
]
lint.pylint.max-args = 10 # default: 5
lint.pylint.max-branches = 20 # default: 12
lint.pylint.max-returns = 8 # default: 6
lint.pylint.max-statements = 88 # default: 50
[tool.codespell] [tool.codespell]
ignore-words-list = "3rt,ans,bitap,crate,damon,fo,followings,hist,iff,kwanza,manuel,mater,secant,som,sur,tim,toi,zar" ignore-words-list = "3rt,ans,bitap,crate,damon,fo,followings,hist,iff,kwanza,manuel,mater,secant,som,sur,tim,toi,zar"
@ -110,6 +132,6 @@ addopts = [
[tool.coverage.report] [tool.coverage.report]
omit = [ omit = [
".env/*", ".env/*",
"project_euler/*" "project_euler/*",
] ]
sort = "Cover" sort = "Cover"