mirror of
https://github.com/hastagAB/Awesome-Python-Scripts.git
synced 2025-01-17 23:07:00 +00:00
[+] Added : New Script To Manage CSV For Storage
-> will allow the user to manage CSV files -> filter data based on certain conditions. -> update data at particular row in csv.
This commit is contained in:
parent
0879d79bfb
commit
ad1cc40043
42
Better_CSV_Storage/README.md
Normal file
42
Better_CSV_Storage/README.md
Normal file
|
@ -0,0 +1,42 @@
|
|||
p
|
||||
|
||||
# Better CSV Storage
|
||||
|
||||
-> For User / Programmer who don't want to use databases and want to use csv instead, this script will help manage that csv file (storage)
|
||||
|
||||
|
||||
|
||||
### What you Can Do ?
|
||||
|
||||
-> Filter Data Based On Certain Conditions that you provide,
|
||||
|
||||
-> Update Certain Value in particular row.
|
||||
|
||||
|
||||
|
||||
### How to use ?
|
||||
|
||||
-> It's OOP based Script so just import that in you Regular Script.
|
||||
|
||||
-> For Example,
|
||||
|
||||
-> I want to check and download_status of files.
|
||||
|
||||
-> csv file i have used -> `dummy_data.csv`
|
||||
|
||||
```python
|
||||
csv_path = 'dummy_data.csv'
|
||||
|
||||
#init class and get the object
|
||||
csv_obj = BetterCSVStorage(csv_path)
|
||||
|
||||
#Now user that object as storage
|
||||
|
||||
#Filter Data Based on Different parameters
|
||||
#This List will contain additional ( update_index ) which is row index of each row in csv.
|
||||
#This update_index will be use to update value of certain index.
|
||||
filtered_data = csv_obj.get_filtered_data('download_status', '==', '')
|
||||
|
||||
#Change Data Based on Different parameters
|
||||
csv_obj.update_data(10,'download_status', 'done')
|
||||
```
|
95
Better_CSV_Storage/better_csv_storage.py
Normal file
95
Better_CSV_Storage/better_csv_storage.py
Normal file
|
@ -0,0 +1,95 @@
|
|||
import csv
|
||||
|
||||
class BetterCSVStorage():
|
||||
|
||||
def __init__(self, csv_path):
|
||||
self.csv_f_path = csv_path
|
||||
self.valid_headers = None
|
||||
self.allowed_comparison = ['==', '!=']
|
||||
|
||||
def load(self):
|
||||
with open(self.csv_f_path, 'r') as csv_f:
|
||||
csv_reader = csv.reader(csv_f)
|
||||
self.valid_headers = next(csv_reader)
|
||||
|
||||
def write_dict_csv(self, data_rows):
|
||||
if data_rows:
|
||||
field_names = list(data_rows[0].keys())
|
||||
with open(self.csv_f_path,'w') as csv_wf:
|
||||
csv_writer = csv.DictWriter(csv_wf, fieldnames=field_names)
|
||||
csv_writer.writeheader()
|
||||
for row in data_rows:
|
||||
csv_writer.writerow(row)
|
||||
print('[+] Data Written Successfully ...')
|
||||
else:
|
||||
print('[-] Error : Data Rows Could not be empty ...')
|
||||
|
||||
def get_filtered_data(self, col_name, comparison, value):
|
||||
if not self.valid_headers:
|
||||
self.load()
|
||||
if not col_name in self.valid_headers:
|
||||
print('[-] Error : Enter Valid Column Name.')
|
||||
print('[*] Info : Allowed Column Names Are : {}'.format(', '.join(self.valid_headers)))
|
||||
else:
|
||||
if not comparison in self.allowed_comparison:
|
||||
print('[-] Error : Invalid Comparison.')
|
||||
print('[*] Info : Allowed Comparison Are : {}'.format(', '.join(self.allowed_comparison)))
|
||||
else:
|
||||
filtered_data = []
|
||||
with open(self.csv_f_path,'r') as c_file:
|
||||
csv_reader = csv.DictReader(c_file)
|
||||
for row_index, row in enumerate(csv_reader):
|
||||
try:
|
||||
if (comparison == '=='):
|
||||
if row[col_name] == value:
|
||||
row['update_index'] = row_index
|
||||
filtered_data.append(row)
|
||||
if (comparison == '!='):
|
||||
if row[col_name] != value:
|
||||
row['update_index'] = row_index
|
||||
filtered_data.append(row)
|
||||
except KeyError:
|
||||
continue
|
||||
return filtered_data
|
||||
|
||||
def update_data(self, update_index, col_name, value):
|
||||
if not self.valid_headers:
|
||||
self.load()
|
||||
if not col_name in self.valid_headers:
|
||||
print('[-] Error : Enter Valid Column Name.')
|
||||
print('[*] Info : Allowed Column Names Are : {}'.format(', '.join(self.valid_headers)))
|
||||
else:
|
||||
if not update_index:
|
||||
print('[-] Error Valid Data Index ....')
|
||||
else:
|
||||
try:
|
||||
update_index = int(update_index)
|
||||
except:
|
||||
print('[-] Error : Update Index is Not Valid')
|
||||
return
|
||||
updated_rows = []
|
||||
with open(self.csv_f_path,'r') as csv_rf:
|
||||
csv_reader = csv.DictReader(csv_rf)
|
||||
for row_index, row in enumerate(csv_reader):
|
||||
if update_index == row_index:
|
||||
print('[+] Updating Index {}'.format(update_index))
|
||||
row[col_name] = value
|
||||
updated_rows.append(row)
|
||||
self.write_dict_csv(updated_rows)
|
||||
|
||||
if __name__ == '__main__':
|
||||
|
||||
csv_path = 'dummy_data.csv'
|
||||
|
||||
#init class and get the object
|
||||
csv_obj = BetterCSVStorage(csv_path)
|
||||
|
||||
#Now user that object as storage
|
||||
|
||||
#Filter Data Based on Different parameters
|
||||
#This List will contain additional ( update_index ) which is row index of each row in csv.
|
||||
#This update_index will be use to update value of certain index.
|
||||
filtered_data = csv_obj.get_filtered_data('download_status', '==', '')
|
||||
|
||||
#Change Data Based on Different parameters
|
||||
csv_obj.update_data(4,'download_status', 'done')
|
7
Better_CSV_Storage/dummy_data.csv
Normal file
7
Better_CSV_Storage/dummy_data.csv
Normal file
|
@ -0,0 +1,7 @@
|
|||
id,file_name,file_url,download_date,download_status
|
||||
1430,sample_video1.mp4,https://example.com/download/sample_video1.mp4,05/10/22,
|
||||
239,sample_video2.mp4,https://example.com/download/sample_video2.mp4,05/10/22,
|
||||
3421,sample_video3.mp4,https://example.com/download/sample_video3.mp4,05/10/22,
|
||||
1305,sample_video4.mp4,https://example.com/download/sample_video4.mp4,05/10/22,
|
||||
6375,sample_video5.mp4,https://example.com/download/sample_video5.mp4,06/10/22,done
|
||||
1203,sample_video6.mp4,https://example.com/download/sample_video6.mp4,06/10/22,done
|
|
Loading…
Reference in New Issue
Block a user