mirror of
https://github.com/TheAlgorithms/Python.git
synced 2024-11-27 15:01:08 +00:00
Add LZ77 compression algorithm (#8059)
* - add "lz77_compressor" class with compress and decompress methods using LZ77 compression algorithm * [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci * - use "list" instead "List", formatting * - fix spelling * - add Python type hints * - add 'Token' class to represent triplet (offset, length, indicator) * [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci * - add test, hange type rom List to list * [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci * - remove extra import * - remove extra types in comments * - better test * - edit comments * - add return types * - add tests for __str__ and __repr__ * Update lz77.py * [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci Co-authored-by: pre-commit-ci[bot] <66853113+pre-commit-ci[bot]@users.noreply.github.com> Co-authored-by: Christian Clauss <cclauss@me.com>
This commit is contained in:
parent
27d56ba393
commit
90686e39b9
227
compression/lz77.py
Normal file
227
compression/lz77.py
Normal file
|
@ -0,0 +1,227 @@
|
|||
"""
|
||||
LZ77 compression algorithm
|
||||
- lossless data compression published in papers by Abraham Lempel and Jacob Ziv in 1977
|
||||
- also known as LZ1 or sliding-window compression
|
||||
- form the basis for many variations including LZW, LZSS, LZMA and others
|
||||
|
||||
It uses a “sliding window” method. Within the sliding window we have:
|
||||
- search buffer
|
||||
- look ahead buffer
|
||||
len(sliding_window) = len(search_buffer) + len(look_ahead_buffer)
|
||||
|
||||
LZ77 manages a dictionary that uses triples composed of:
|
||||
- Offset into search buffer, it's the distance between the start of a phrase and
|
||||
the beginning of a file.
|
||||
- Length of the match, it's the number of characters that make up a phrase.
|
||||
- The indicator is represented by a character that is going to be encoded next.
|
||||
|
||||
As a file is parsed, the dictionary is dynamically updated to reflect the compressed
|
||||
data contents and size.
|
||||
|
||||
Examples:
|
||||
"cabracadabrarrarrad" <-> [(0, 0, 'c'), (0, 0, 'a'), (0, 0, 'b'), (0, 0, 'r'),
|
||||
(3, 1, 'c'), (2, 1, 'd'), (7, 4, 'r'), (3, 5, 'd')]
|
||||
"ababcbababaa" <-> [(0, 0, 'a'), (0, 0, 'b'), (2, 2, 'c'), (4, 3, 'a'), (2, 2, 'a')]
|
||||
"aacaacabcabaaac" <-> [(0, 0, 'a'), (1, 1, 'c'), (3, 4, 'b'), (3, 3, 'a'), (1, 2, 'c')]
|
||||
|
||||
Sources:
|
||||
en.wikipedia.org/wiki/LZ77_and_LZ78
|
||||
"""
|
||||
|
||||
|
||||
from dataclasses import dataclass
|
||||
|
||||
__version__ = "0.1"
|
||||
__author__ = "Lucia Harcekova"
|
||||
|
||||
|
||||
@dataclass
|
||||
class Token:
|
||||
"""
|
||||
Dataclass representing triplet called token consisting of length, offset
|
||||
and indicator. This triplet is used during LZ77 compression.
|
||||
"""
|
||||
|
||||
offset: int
|
||||
length: int
|
||||
indicator: str
|
||||
|
||||
def __repr__(self) -> str:
|
||||
"""
|
||||
>>> token = Token(1, 2, "c")
|
||||
>>> repr(token)
|
||||
'(1, 2, c)'
|
||||
>>> str(token)
|
||||
'(1, 2, c)'
|
||||
"""
|
||||
return f"({self.offset}, {self.length}, {self.indicator})"
|
||||
|
||||
|
||||
class LZ77Compressor:
|
||||
"""
|
||||
Class containing compress and decompress methods using LZ77 compression algorithm.
|
||||
"""
|
||||
|
||||
def __init__(self, window_size: int = 13, lookahead_buffer_size: int = 6) -> None:
|
||||
self.window_size = window_size
|
||||
self.lookahead_buffer_size = lookahead_buffer_size
|
||||
self.search_buffer_size = self.window_size - self.lookahead_buffer_size
|
||||
|
||||
def compress(self, text: str) -> list[Token]:
|
||||
"""
|
||||
Compress the given string text using LZ77 compression algorithm.
|
||||
|
||||
Args:
|
||||
text: string to be compressed
|
||||
|
||||
Returns:
|
||||
output: the compressed text as a list of Tokens
|
||||
|
||||
>>> lz77_compressor = LZ77Compressor()
|
||||
>>> str(lz77_compressor.compress("ababcbababaa"))
|
||||
'[(0, 0, a), (0, 0, b), (2, 2, c), (4, 3, a), (2, 2, a)]'
|
||||
>>> str(lz77_compressor.compress("aacaacabcabaaac"))
|
||||
'[(0, 0, a), (1, 1, c), (3, 4, b), (3, 3, a), (1, 2, c)]'
|
||||
"""
|
||||
|
||||
output = []
|
||||
search_buffer = ""
|
||||
|
||||
# while there are still characters in text to compress
|
||||
while text:
|
||||
|
||||
# find the next encoding phrase
|
||||
# - triplet with offset, length, indicator (the next encoding character)
|
||||
token = self._find_encoding_token(text, search_buffer)
|
||||
|
||||
# update the search buffer:
|
||||
# - add new characters from text into it
|
||||
# - check if size exceed the max search buffer size, if so, drop the
|
||||
# oldest elements
|
||||
search_buffer += text[: token.length + 1]
|
||||
if len(search_buffer) > self.search_buffer_size:
|
||||
search_buffer = search_buffer[-self.search_buffer_size :]
|
||||
|
||||
# update the text
|
||||
text = text[token.length + 1 :]
|
||||
|
||||
# append the token to output
|
||||
output.append(token)
|
||||
|
||||
return output
|
||||
|
||||
def decompress(self, tokens: list[Token]) -> str:
|
||||
"""
|
||||
Convert the list of tokens into an output string.
|
||||
|
||||
Args:
|
||||
tokens: list containing triplets (offset, length, char)
|
||||
|
||||
Returns:
|
||||
output: decompressed text
|
||||
|
||||
Tests:
|
||||
>>> lz77_compressor = LZ77Compressor()
|
||||
>>> lz77_compressor.decompress([Token(0, 0, 'c'), Token(0, 0, 'a'),
|
||||
... Token(0, 0, 'b'), Token(0, 0, 'r'), Token(3, 1, 'c'),
|
||||
... Token(2, 1, 'd'), Token(7, 4, 'r'), Token(3, 5, 'd')])
|
||||
'cabracadabrarrarrad'
|
||||
>>> lz77_compressor.decompress([Token(0, 0, 'a'), Token(0, 0, 'b'),
|
||||
... Token(2, 2, 'c'), Token(4, 3, 'a'), Token(2, 2, 'a')])
|
||||
'ababcbababaa'
|
||||
>>> lz77_compressor.decompress([Token(0, 0, 'a'), Token(1, 1, 'c'),
|
||||
... Token(3, 4, 'b'), Token(3, 3, 'a'), Token(1, 2, 'c')])
|
||||
'aacaacabcabaaac'
|
||||
"""
|
||||
|
||||
output = ""
|
||||
|
||||
for token in tokens:
|
||||
for _ in range(token.length):
|
||||
output += output[-token.offset]
|
||||
output += token.indicator
|
||||
|
||||
return output
|
||||
|
||||
def _find_encoding_token(self, text: str, search_buffer: str) -> Token:
|
||||
"""Finds the encoding token for the first character in the text.
|
||||
|
||||
Tests:
|
||||
>>> lz77_compressor = LZ77Compressor()
|
||||
>>> lz77_compressor._find_encoding_token("abrarrarrad", "abracad").offset
|
||||
7
|
||||
>>> lz77_compressor._find_encoding_token("adabrarrarrad", "cabrac").length
|
||||
1
|
||||
>>> lz77_compressor._find_encoding_token("abc", "xyz").offset
|
||||
0
|
||||
>>> lz77_compressor._find_encoding_token("", "xyz").offset
|
||||
Traceback (most recent call last):
|
||||
...
|
||||
ValueError: We need some text to work with.
|
||||
>>> lz77_compressor._find_encoding_token("abc", "").offset
|
||||
0
|
||||
"""
|
||||
|
||||
if not text:
|
||||
raise ValueError("We need some text to work with.")
|
||||
|
||||
# Initialise result parameters to default values
|
||||
length, offset = 0, 0
|
||||
|
||||
if not search_buffer:
|
||||
return Token(offset, length, text[length])
|
||||
|
||||
for i, character in enumerate(search_buffer):
|
||||
found_offset = len(search_buffer) - i
|
||||
if character == text[0]:
|
||||
found_length = self._match_length_from_index(text, search_buffer, 0, i)
|
||||
# if the found length is bigger than the current or if it's equal,
|
||||
# which means it's offset is smaller: update offset and length
|
||||
if found_length >= length:
|
||||
offset, length = found_offset, found_length
|
||||
|
||||
return Token(offset, length, text[length])
|
||||
|
||||
def _match_length_from_index(
|
||||
self, text: str, window: str, text_index: int, window_index: int
|
||||
) -> int:
|
||||
"""Calculate the longest possible match of text and window characters from
|
||||
text_index in text and window_index in window.
|
||||
|
||||
Args:
|
||||
text: _description_
|
||||
window: sliding window
|
||||
text_index: index of character in text
|
||||
window_index: index of character in sliding window
|
||||
|
||||
Returns:
|
||||
The maximum match between text and window, from given indexes.
|
||||
|
||||
Tests:
|
||||
>>> lz77_compressor = LZ77Compressor(13, 6)
|
||||
>>> lz77_compressor._match_length_from_index("rarrad", "adabrar", 0, 4)
|
||||
5
|
||||
>>> lz77_compressor._match_length_from_index("adabrarrarrad",
|
||||
... "cabrac", 0, 1)
|
||||
1
|
||||
"""
|
||||
if not text or text[text_index] != window[window_index]:
|
||||
return 0
|
||||
return 1 + self._match_length_from_index(
|
||||
text, window + text[text_index], text_index + 1, window_index + 1
|
||||
)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
from doctest import testmod
|
||||
|
||||
testmod()
|
||||
# Initialize compressor class
|
||||
lz77_compressor = LZ77Compressor(window_size=13, lookahead_buffer_size=6)
|
||||
|
||||
# Example
|
||||
TEXT = "cabracadabrarrarrad"
|
||||
compressed_text = lz77_compressor.compress(TEXT)
|
||||
print(lz77_compressor.compress("ababcbababaa"))
|
||||
decompressed_text = lz77_compressor.decompress(compressed_text)
|
||||
assert decompressed_text == TEXT, "The LZ77 algorithm returned the invalid result."
|
Loading…
Reference in New Issue
Block a user