Add lempel ziv compression (#2107)

* Added lempel-ziv compression algorithm implementation

* Added lempel-ziv decompression algorithm implementation

* Reformatted lempel-ziv compress/decompress files using black

* Added type hints and some other modifications (Doctests coming up)

* Shortened several lines to comply with the standards
This commit is contained in:
Ioane Margiani 2020-06-17 23:12:48 +04:00 committed by GitHub
parent 671e570c35
commit 19b713aecb
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 236 additions and 0 deletions

125
compression/lempel_ziv.py Normal file
View File

@ -0,0 +1,125 @@
"""
One of the several implementations of LempelZivWelch compression algorithm
https://en.wikipedia.org/wiki/Lempel%E2%80%93Ziv%E2%80%93Welch
"""
import math
import os
import sys
def read_file_binary(file_path: str) -> str:
"""
Reads given file as bytes and returns them as a long string
"""
result = ""
try:
with open(file_path, "rb") as binary_file:
data = binary_file.read()
for dat in data:
curr_byte = "{0:08b}".format(dat)
result += curr_byte
return result
except IOError:
print("File not accessible")
sys.exit()
def add_key_to_lexicon(
lexicon: dict, curr_string: str, index: int, last_match_id: int
) -> None:
"""
Adds new strings (curr_string + "0", curr_string + "1") to the lexicon
"""
lexicon.pop(curr_string)
lexicon[curr_string + "0"] = last_match_id
if math.log2(index).is_integer():
for curr_key in lexicon:
lexicon[curr_key] = "0" + lexicon[curr_key]
lexicon[curr_string + "1"] = bin(index)[2:]
def compress_data(data_bits: str) -> str:
"""
Compresses given data_bits using LempelZivWelch compression algorithm
and returns the result as a string
"""
lexicon = {"0": "0", "1": "1"}
result, curr_string = "", ""
index = len(lexicon)
for i in range(len(data_bits)):
curr_string += data_bits[i]
if curr_string not in lexicon:
continue
last_match_id = lexicon[curr_string]
result += last_match_id
add_key_to_lexicon(lexicon, curr_string, index, last_match_id)
index += 1
curr_string = ""
while curr_string != "" and curr_string not in lexicon:
curr_string += "0"
if curr_string != "":
last_match_id = lexicon[curr_string]
result += last_match_id
return result
def add_file_length(source_path: str, compressed: str) -> str:
"""
Adds given file's length in front (using Elias gamma coding) of the compressed
string
"""
file_length = os.path.getsize(source_path)
file_length_binary = bin(file_length)[2:]
length_length = len(file_length_binary)
return "0" * (length_length - 1) + file_length_binary + compressed
def write_file_binary(file_path: str, to_write: str) -> None:
"""
Writes given to_write string (should only consist of 0's and 1's) as bytes in the
file
"""
byte_length = 8
try:
with open(file_path, "wb") as opened_file:
result_byte_array = [
to_write[i : i + byte_length]
for i in range(0, len(to_write), byte_length)
]
if len(result_byte_array[-1]) % byte_length == 0:
result_byte_array.append("10000000")
else:
result_byte_array[-1] += "1" + "0" * (
byte_length - len(result_byte_array[-1]) - 1
)
for elem in result_byte_array:
opened_file.write(int(elem, 2).to_bytes(1, byteorder="big"))
except IOError:
print("File not accessible")
sys.exit()
def compress(source_path, destination_path: str) -> None:
"""
Reads source file, compresses it and writes the compressed result in destination
file
"""
data_bits = read_file_binary(source_path)
compressed = compress_data(data_bits)
compressed = add_file_length(source_path, compressed)
write_file_binary(destination_path, compressed)
if __name__ == "__main__":
compress(sys.argv[1], sys.argv[2])

View File

@ -0,0 +1,111 @@
"""
One of the several implementations of LempelZivWelch decompression algorithm
https://en.wikipedia.org/wiki/Lempel%E2%80%93Ziv%E2%80%93Welch
"""
import math
import sys
def read_file_binary(file_path: str) -> str:
"""
Reads given file as bytes and returns them as a long string
"""
result = ""
try:
with open(file_path, "rb") as binary_file:
data = binary_file.read()
for dat in data:
curr_byte = "{0:08b}".format(dat)
result += curr_byte
return result
except IOError:
print("File not accessible")
sys.exit()
def decompress_data(data_bits: str) -> str:
"""
Decompresses given data_bits using LempelZivWelch compression algorithm
and returns the result as a string
"""
lexicon = {"0": "0", "1": "1"}
result, curr_string = "", ""
index = len(lexicon)
for i in range(len(data_bits)):
curr_string += data_bits[i]
if curr_string not in lexicon:
continue
last_match_id = lexicon[curr_string]
result += last_match_id
lexicon[curr_string] = last_match_id + "0"
if math.log2(index).is_integer():
newLex = {}
for curr_key in list(lexicon):
newLex["0" + curr_key] = lexicon.pop(curr_key)
lexicon = newLex
lexicon[bin(index)[2:]] = last_match_id + "1"
index += 1
curr_string = ""
return result
def write_file_binary(file_path: str, to_write: str) -> None:
"""
Writes given to_write string (should only consist of 0's and 1's) as bytes in the
file
"""
byte_length = 8
try:
with open(file_path, "wb") as opened_file:
result_byte_array = [
to_write[i : i + byte_length]
for i in range(0, len(to_write), byte_length)
]
if len(result_byte_array[-1]) % byte_length == 0:
result_byte_array.append("10000000")
else:
result_byte_array[-1] += "1" + "0" * (
byte_length - len(result_byte_array[-1]) - 1
)
for elem in result_byte_array[:-1]:
opened_file.write(int(elem, 2).to_bytes(1, byteorder="big"))
except IOError:
print("File not accessible")
sys.exit()
def remove_prefix(data_bits: str) -> str:
"""
Removes size prefix, that compressed file should have
Returns the result
"""
counter = 0
for letter in data_bits:
if letter == "1":
break
counter += 1
data_bits = data_bits[counter:]
data_bits = data_bits[counter + 1 :]
return data_bits
def compress(source_path: str, destination_path: str) -> None:
"""
Reads source file, decompresses it and writes the result in destination file
"""
data_bits = read_file_binary(source_path)
data_bits = remove_prefix(data_bits)
decompressed = decompress_data(data_bits)
write_file_binary(destination_path, decompressed)
if __name__ == "__main__":
compress(sys.argv[1], sys.argv[2])