Python/sorts/external_sort.py

159 lines
4.3 KiB
Python
Raw Normal View History

2017-10-20 12:33:43 +00:00
#!/usr/bin/env python
#
# Sort large text files in a minimum amount of memory
#
import os
import argparse
class FileSplitter(object):
BLOCK_FILENAME_FORMAT = 'block_{0}.dat'
def __init__(self, filename):
self.filename = filename
self.block_filenames = []
def write_block(self, data, block_number):
filename = self.BLOCK_FILENAME_FORMAT.format(block_number)
with open(filename, 'w') as file:
file.write(data)
2017-10-20 12:33:43 +00:00
self.block_filenames.append(filename)
def get_block_filenames(self):
return self.block_filenames
def split(self, block_size, sort_key=None):
i = 0
with open(self.filename) as file:
while True:
lines = file.readlines(block_size)
2017-10-20 12:33:43 +00:00
if lines == []:
break
2017-10-20 12:33:43 +00:00
if sort_key is None:
lines.sort()
else:
lines.sort(key=sort_key)
2017-10-20 12:33:43 +00:00
self.write_block(''.join(lines), i)
i += 1
2017-10-20 12:33:43 +00:00
def cleanup(self):
map(lambda f: os.remove(f), self.block_filenames)
class NWayMerge(object):
def select(self, choices):
min_index = -1
min_str = None
for i in range(len(choices)):
if min_str is None or choices[i] < min_str:
min_index = i
return min_index
class FilesArray(object):
def __init__(self, files):
self.files = files
self.empty = set()
self.num_buffers = len(files)
self.buffers = {i: None for i in range(self.num_buffers)}
def get_dict(self):
return {i: self.buffers[i] for i in range(self.num_buffers) if i not in self.empty}
def refresh(self):
for i in range(self.num_buffers):
if self.buffers[i] is None and i not in self.empty:
self.buffers[i] = self.files[i].readline()
if self.buffers[i] == '':
self.empty.add(i)
self.files[i].close()
2017-10-20 12:33:43 +00:00
if len(self.empty) == self.num_buffers:
return False
return True
def unshift(self, index):
value = self.buffers[index]
self.buffers[index] = None
return value
class FileMerger(object):
def __init__(self, merge_strategy):
self.merge_strategy = merge_strategy
def merge(self, filenames, outfilename, buffer_size):
buffers = FilesArray(self.get_file_handles(filenames, buffer_size))
with open(outfilename, 'w', buffer_size) as outfile:
while buffers.refresh():
min_index = self.merge_strategy.select(buffers.get_dict())
outfile.write(buffers.unshift(min_index))
2017-10-20 12:33:43 +00:00
def get_file_handles(self, filenames, buffer_size):
files = {}
for i in range(len(filenames)):
files[i] = open(filenames[i], 'r', buffer_size)
return files
class ExternalSort(object):
def __init__(self, block_size):
self.block_size = block_size
def sort(self, filename, sort_key=None):
num_blocks = self.get_number_blocks(filename, self.block_size)
splitter = FileSplitter(filename)
splitter.split(self.block_size, sort_key)
merger = FileMerger(NWayMerge())
buffer_size = self.block_size / (num_blocks + 1)
merger.merge(splitter.get_block_filenames(), filename + '.out', buffer_size)
splitter.cleanup()
def get_number_blocks(self, filename, block_size):
return (os.stat(filename).st_size / block_size) + 1
def parse_memory(string):
if string[-1].lower() == 'k':
return int(string[:-1]) * 1024
elif string[-1].lower() == 'm':
return int(string[:-1]) * 1024 * 1024
elif string[-1].lower() == 'g':
return int(string[:-1]) * 1024 * 1024 * 1024
else:
return int(string)
def main():
parser = argparse.ArgumentParser()
parser.add_argument('-m',
'--mem',
help='amount of memory to use for sorting',
default='100M')
parser.add_argument('filename',
metavar='<filename>',
nargs=1,
help='name of file to sort')
args = parser.parse_args()
sorter = ExternalSort(parse_memory(args.mem))
sorter.sort(args.filename[0])
if __name__ == '__main__':
main()