mirror of
https://github.com/hastagAB/Awesome-Python-Scripts.git
synced 2024-11-30 15:31:07 +00:00
96 lines
3.9 KiB
Python
96 lines
3.9 KiB
Python
|
import csv
|
||
|
|
||
|
class BetterCSVStorage():
|
||
|
|
||
|
def __init__(self, csv_path):
|
||
|
self.csv_f_path = csv_path
|
||
|
self.valid_headers = None
|
||
|
self.allowed_comparison = ['==', '!=']
|
||
|
|
||
|
def load(self):
|
||
|
with open(self.csv_f_path, 'r') as csv_f:
|
||
|
csv_reader = csv.reader(csv_f)
|
||
|
self.valid_headers = next(csv_reader)
|
||
|
|
||
|
def write_dict_csv(self, data_rows):
|
||
|
if data_rows:
|
||
|
field_names = list(data_rows[0].keys())
|
||
|
with open(self.csv_f_path,'w') as csv_wf:
|
||
|
csv_writer = csv.DictWriter(csv_wf, fieldnames=field_names)
|
||
|
csv_writer.writeheader()
|
||
|
for row in data_rows:
|
||
|
csv_writer.writerow(row)
|
||
|
print('[+] Data Written Successfully ...')
|
||
|
else:
|
||
|
print('[-] Error : Data Rows Could not be empty ...')
|
||
|
|
||
|
def get_filtered_data(self, col_name, comparison, value):
|
||
|
if not self.valid_headers:
|
||
|
self.load()
|
||
|
if not col_name in self.valid_headers:
|
||
|
print('[-] Error : Enter Valid Column Name.')
|
||
|
print('[*] Info : Allowed Column Names Are : {}'.format(', '.join(self.valid_headers)))
|
||
|
else:
|
||
|
if not comparison in self.allowed_comparison:
|
||
|
print('[-] Error : Invalid Comparison.')
|
||
|
print('[*] Info : Allowed Comparison Are : {}'.format(', '.join(self.allowed_comparison)))
|
||
|
else:
|
||
|
filtered_data = []
|
||
|
with open(self.csv_f_path,'r') as c_file:
|
||
|
csv_reader = csv.DictReader(c_file)
|
||
|
for row_index, row in enumerate(csv_reader):
|
||
|
try:
|
||
|
if (comparison == '=='):
|
||
|
if row[col_name] == value:
|
||
|
row['update_index'] = row_index
|
||
|
filtered_data.append(row)
|
||
|
if (comparison == '!='):
|
||
|
if row[col_name] != value:
|
||
|
row['update_index'] = row_index
|
||
|
filtered_data.append(row)
|
||
|
except KeyError:
|
||
|
continue
|
||
|
return filtered_data
|
||
|
|
||
|
def update_data(self, update_index, col_name, value):
|
||
|
if not self.valid_headers:
|
||
|
self.load()
|
||
|
if not col_name in self.valid_headers:
|
||
|
print('[-] Error : Enter Valid Column Name.')
|
||
|
print('[*] Info : Allowed Column Names Are : {}'.format(', '.join(self.valid_headers)))
|
||
|
else:
|
||
|
if not update_index:
|
||
|
print('[-] Error Valid Data Index ....')
|
||
|
else:
|
||
|
try:
|
||
|
update_index = int(update_index)
|
||
|
except:
|
||
|
print('[-] Error : Update Index is Not Valid')
|
||
|
return
|
||
|
updated_rows = []
|
||
|
with open(self.csv_f_path,'r') as csv_rf:
|
||
|
csv_reader = csv.DictReader(csv_rf)
|
||
|
for row_index, row in enumerate(csv_reader):
|
||
|
if update_index == row_index:
|
||
|
print('[+] Updating Index {}'.format(update_index))
|
||
|
row[col_name] = value
|
||
|
updated_rows.append(row)
|
||
|
self.write_dict_csv(updated_rows)
|
||
|
|
||
|
if __name__ == '__main__':
|
||
|
|
||
|
csv_path = 'dummy_data.csv'
|
||
|
|
||
|
#init class and get the object
|
||
|
csv_obj = BetterCSVStorage(csv_path)
|
||
|
|
||
|
#Now user that object as storage
|
||
|
|
||
|
#Filter Data Based on Different parameters
|
||
|
#This List will contain additional ( update_index ) which is row index of each row in csv.
|
||
|
#This update_index will be use to update value of certain index.
|
||
|
filtered_data = csv_obj.get_filtered_data('download_status', '==', '')
|
||
|
|
||
|
#Change Data Based on Different parameters
|
||
|
csv_obj.update_data(4,'download_status', 'done')
|