diff --git a/other/token_bucket.py b/other/token_bucket.py new file mode 100644 index 000000000..b188c73d2 --- /dev/null +++ b/other/token_bucket.py @@ -0,0 +1,97 @@ +# Implementation of Token Bucket Algorithm +# Token `rate` is added to the bucket every `frequency` in seconds. +# The bucket can hold tokens up to `capacity` (full). +# The bucket starts full. +# Each request consume one token. +# If a token arrives when the bucket is full, token is discarded. +# If a request arrives when bucket is empty, request will be discarded. +# If bucket has tokens available, requests will pass. +# https://en.wikipedia.org/wiki/Token_bucket +import threading +import time + + +class TokenBucketRateLimiter: + def __init__(self, rate: int, capacity: int, frequency: int) -> None: + """ + Initialize a Token Bucket rate limiter. + + :param rate: Number of tokens added to the bucket per refill + :param capacity: Maximum number of tokens the bucket can hold. + :param frequency: Frequency of refill in seconds + >>> bucket = TokenBucketRateLimiter(4, 4, 60) + >>> bucket.tokens + 4 + >>> bucket.capacity + 4 + >>> bucket.frequency + 60 + """ + self.rate = rate # Tokens added per refill + self.capacity = capacity # Maximum capacity of the bucket + self.frequency = frequency # Frequency tokens are refilled + self.tokens = capacity # Current tokens in the bucket + self.last_checked = time.time() # Time when tokens were last checked + self.lock = threading.Lock() # To make the rate limiter thread-safe + + def _add_tokens(self) -> None: + """ + Refill tokens only when a full minute has passed. + >>> bucket = TokenBucketRateLimiter(1, 4, 60) + >>> bucket.tokens # Initially has 4 token (rate) + 4 + >>> bucket._add_tokens() + >>> bucket.tokens # Bucket already full + 4 + """ + current_time = time.time() + elapsed_time = current_time - self.last_checked + + if elapsed_time >= self.frequency: + minutes_passed = int(elapsed_time // self.frequency) + + # Add tokens based on rate + added_tokens = minutes_passed * self.rate + self.tokens = min(self.capacity, self.tokens + added_tokens) + + # Update the last checked time + self.last_checked += minutes_passed * self.frequency + + def allow_request(self) -> bool: + """ + Check if a request is allowed. + If there are enough tokens, it consumes one token. + :return: True if the request is allowed, False otherwise. + >>> bucket = TokenBucketRateLimiter(1, 2, 60) + >>> bucket.allow_request() # Token is available, request passes + True + >>> bucket.allow_request() # Token is available, request passes + True + >>> bucket.allow_request() # No token left, request is dropped + False + """ + with self.lock: + self._add_tokens() + if self.tokens >= 1: + self.tokens -= 1 + return True + return False + + +if __name__ == "__main__": + import doctest + + doctest.testmod() + + print("Allow 4 requests per minute, capacity of 4") + bucket = TokenBucketRateLimiter(4, 4, 60) + total_requests = 10 + delay_in_seconds = 10 + print("Simulate 1 request per 10 seconds...") + for i in range(total_requests): + result = "pass" if bucket.allow_request() else "dropped" + print( + f"Request {i+1}/{total_requests} \ + timeline: {i*delay_in_seconds} seconds = {result}" + ) + time.sleep(delay_in_seconds)