Awesome-Python-Scripts/Better_CSV_Storage/better_csv_storage.py

96 lines
3.9 KiB
Python
Raw Normal View History

import csv
class BetterCSVStorage():
def __init__(self, csv_path):
self.csv_f_path = csv_path
self.valid_headers = None
self.allowed_comparison = ['==', '!=']
def load(self):
with open(self.csv_f_path, 'r') as csv_f:
csv_reader = csv.reader(csv_f)
self.valid_headers = next(csv_reader)
def write_dict_csv(self, data_rows):
if data_rows:
field_names = list(data_rows[0].keys())
with open(self.csv_f_path,'w') as csv_wf:
csv_writer = csv.DictWriter(csv_wf, fieldnames=field_names)
csv_writer.writeheader()
for row in data_rows:
csv_writer.writerow(row)
print('[+] Data Written Successfully ...')
else:
print('[-] Error : Data Rows Could not be empty ...')
def get_filtered_data(self, col_name, comparison, value):
if not self.valid_headers:
self.load()
if not col_name in self.valid_headers:
print('[-] Error : Enter Valid Column Name.')
print('[*] Info : Allowed Column Names Are : {}'.format(', '.join(self.valid_headers)))
else:
if not comparison in self.allowed_comparison:
print('[-] Error : Invalid Comparison.')
print('[*] Info : Allowed Comparison Are : {}'.format(', '.join(self.allowed_comparison)))
else:
filtered_data = []
with open(self.csv_f_path,'r') as c_file:
csv_reader = csv.DictReader(c_file)
for row_index, row in enumerate(csv_reader):
try:
if (comparison == '=='):
if row[col_name] == value:
row['update_index'] = row_index
filtered_data.append(row)
if (comparison == '!='):
if row[col_name] != value:
row['update_index'] = row_index
filtered_data.append(row)
except KeyError:
continue
return filtered_data
def update_data(self, update_index, col_name, value):
if not self.valid_headers:
self.load()
if not col_name in self.valid_headers:
print('[-] Error : Enter Valid Column Name.')
print('[*] Info : Allowed Column Names Are : {}'.format(', '.join(self.valid_headers)))
else:
if not update_index:
print('[-] Error Valid Data Index ....')
else:
try:
update_index = int(update_index)
except:
print('[-] Error : Update Index is Not Valid')
return
updated_rows = []
with open(self.csv_f_path,'r') as csv_rf:
csv_reader = csv.DictReader(csv_rf)
for row_index, row in enumerate(csv_reader):
if update_index == row_index:
print('[+] Updating Index {}'.format(update_index))
row[col_name] = value
updated_rows.append(row)
self.write_dict_csv(updated_rows)
if __name__ == '__main__':
csv_path = 'dummy_data.csv'
#init class and get the object
csv_obj = BetterCSVStorage(csv_path)
#Now user that object as storage
#Filter Data Based on Different parameters
#This List will contain additional ( update_index ) which is row index of each row in csv.
#This update_index will be use to update value of certain index.
filtered_data = csv_obj.get_filtered_data('download_status', '==', '')
#Change Data Based on Different parameters
csv_obj.update_data(4,'download_status', 'done')