mirror of
https://github.com/hastagAB/Awesome-Python-Scripts.git
synced 2024-11-27 14:01:09 +00:00
213 lines
8.4 KiB
Python
213 lines
8.4 KiB
Python
from dataclasses import field, fields
|
|
import sqlite3
|
|
from unittest import result
|
|
|
|
class CustomSqliteAction():
|
|
def __init__(self,database_name, database_table, database_fields):
|
|
self.database = database_name
|
|
self.table_name = database_table
|
|
self.fields = database_fields
|
|
|
|
self.db_conn = None
|
|
self.valid_fields = None
|
|
|
|
def connect_to_db(self):
|
|
print('[+] Connecting to {}'.format(self.database))
|
|
db_conn = sqlite3.connect(self.database)
|
|
self.db_conn = db_conn
|
|
|
|
def load_table_info(self):
|
|
self.valid_fields = [f_name[0] for f_name in self.fields]
|
|
|
|
def table_validation(self,inserted_fields):
|
|
return list(set(inserted_fields).difference(set(self.valid_fields)))
|
|
|
|
def _parse_result(self,db_data):
|
|
query_set = []
|
|
for data in db_data:
|
|
data_dict = {k: v for k, v in zip(self.valid_fields, data)}
|
|
query_set.append(data_dict)
|
|
return query_set
|
|
|
|
def create_table(self):
|
|
sql_string = """CREATE TABLE IF NOT EXISTS {table_name} {field_string};"""
|
|
field_string = "("+", ".join([" ".join(fi for fi in f) for f in self.fields])+")"
|
|
sql_string = sql_string.format(table_name=self.table_name,field_string=field_string)
|
|
|
|
print("[+] Creating Table {} .....\n".format(self.table_name))
|
|
cur = self.db_conn.cursor()
|
|
cur.execute(sql_string)
|
|
|
|
def table_exists(self):
|
|
sql_string = """SELECT * FROM {}""".format(self.table_name)
|
|
try:
|
|
cur = self.db_conn.cursor()
|
|
cur.execute(sql_string)
|
|
print('[+] Connecting To Table {}\n'.format(self.table_name))
|
|
return True
|
|
except sqlite3.OperationalError:
|
|
print('[-] TABLE NAME {} DOES NOT EXISTS'.format(self.table_name))
|
|
return False
|
|
|
|
def store_data(self,**kwargs):
|
|
validation = self.table_validation(kwargs.keys())
|
|
if not validation:
|
|
sql_string = """INSERT INTO {table_name} {field_string} VALUES {value_string};"""
|
|
field_string = "("+", ".join([f for f in kwargs.keys()])+")"
|
|
value_string = "("+ ", ".join([f"'{v}'" for v in kwargs.values()]) +")"
|
|
|
|
sql_string = sql_string.format(table_name=self.table_name,field_string=field_string,value_string=value_string)
|
|
cur = self.db_conn.cursor()
|
|
try:
|
|
cur.execute(sql_string)
|
|
except sqlite3.OperationalError:
|
|
print('[-] Database Syntax Error probabily because of \' in data ')
|
|
self.db_conn.commit()
|
|
else:
|
|
print('\n[-] STORE DATA ERROR ...')
|
|
print('[-] {} IS NOT VALID FIELD NAME'.format(', '.join(validation)))
|
|
print('[-] CHOICES ARE {}'.format((', ').join(self.valid_fields)))
|
|
|
|
def delete_data(self,**kwargs):
|
|
validation = self.table_validation(kwargs.keys())
|
|
if not validation:
|
|
if len(kwargs) == 1:
|
|
sql_string = """DELETE FROM {table_name} WHERE {field} = '{field_id}';"""
|
|
sql_string = sql_string.format(table_name=self.table_name,field=list(kwargs.keys())[0],field_id=list(kwargs.values())[0])
|
|
elif len(kwargs) > 1:
|
|
inintial_string = """DELETE FROM {table_name} WHERE """.format(table_name=self.table_name)
|
|
field_string = " AND ".join(["{field} = '{field_value}'".format(field=f[0],field_value=f[1]) for f in kwargs.items() ]) + ";"
|
|
sql_string = inintial_string + field_string
|
|
else:
|
|
print('[-] At least Provide 1 Argument')
|
|
return
|
|
|
|
cur = self.db_conn.cursor()
|
|
cur.execute(sql_string)
|
|
self.db_conn.commit()
|
|
print("[+] Delete Data Successfully")
|
|
|
|
else:
|
|
print('\n[-] DELETE DATA ERROR ...')
|
|
print('[-] {} IS NOT VALID FIELD NAME'.format(', '.join(validation)))
|
|
print('[-] CHOICES ARE {}'.format((', ').join(self.valid_fields)))
|
|
|
|
def update_data(self,search_tuple, **kwargs):
|
|
validation = self.table_validation(kwargs.keys())
|
|
if not validation:
|
|
if len(kwargs) == 1:
|
|
sql_string = """UPDATE {table_name} SET {field} = '{update_value}' WHERE {p_field} = {field_id};"""
|
|
sql_string = sql_string.format(table_name=self.table_name, field=list(kwargs.keys())[0], update_value=list(kwargs.values())[0], p_field=search_tuple[0], field_id=search_tuple[1])
|
|
else:
|
|
print('[-] Only One Upadte Argument Allowed')
|
|
return
|
|
cur = self.db_conn.cursor()
|
|
cur.execute(sql_string)
|
|
self.db_conn.commit()
|
|
print("[+] Update Data Successfully")
|
|
else:
|
|
print('\n[-] DELETE DATA ERROR ...')
|
|
print('[-] {} IS NOT VALID FIELD NAME'.format(', '.join(validation)))
|
|
print('[-] CHOICES ARE {}'.format((', ').join(self.valid_fields)))
|
|
|
|
def read_data(self,**kwargs):
|
|
validation = self.table_validation(kwargs.keys())
|
|
if not validation:
|
|
if len(kwargs) == 1:
|
|
sql_string = """SELECT * FROM {table_name} WHERE {field} = '{read_value}';"""
|
|
sql_string = sql_string.format(table_name=self.table_name, field=list(kwargs.keys())[0], read_value=list(kwargs.values())[0])
|
|
elif len(kwargs) > 1:
|
|
inintial_string = """SELECT * FROM {table_name} WHERE """.format(table_name=self.table_name)
|
|
field_string = " AND ".join(["{field} = '{read_value}'".format(field=f[0],read_value=f[1]) for f in kwargs.items() ]) + ";"
|
|
sql_string = inintial_string + field_string
|
|
else:
|
|
print('[-] Provide At least One Argument')
|
|
return
|
|
|
|
cur = self.db_conn.cursor()
|
|
cur.execute(sql_string)
|
|
self.db_conn.commit()
|
|
|
|
#FETCHING DATA
|
|
result = cur.fetchall()
|
|
return self._parse_result(result)
|
|
else:
|
|
print('\n[-] READ DATA ERROR ...')
|
|
print('[-] {} IS NOT VALID FIELD NAME'.format(', '.join(validation)))
|
|
print('[-] CHOICES ARE {}'.format((', ').join(self.valid_fields)))
|
|
|
|
def read_all(self):
|
|
#PART1 : CREATING THE SQL STRING
|
|
sql_string = """SELECT * FROM {table_name};"""
|
|
sql_string = sql_string.format(table_name=self.table_name)
|
|
|
|
#PART2 : EXECUTING THAT CREARED STRING
|
|
cur = self.db_conn.cursor()
|
|
cur.execute(sql_string)
|
|
self.db_conn.commit()
|
|
|
|
#FETCHING DATA
|
|
result = cur.fetchall()
|
|
return self._parse_result(result)
|
|
|
|
def load(self):
|
|
self.connect_to_db()
|
|
if not self.table_exists():
|
|
self.create_table()
|
|
self.load_table_info()
|
|
else:
|
|
self.load_table_info()
|
|
|
|
|
|
if __name__ == '__main__':
|
|
db_file_name = 'download_queue.db'
|
|
db_table = 'DownloadQueue'
|
|
db_fields = [
|
|
('id','integer','primary key','autoincrement'),
|
|
('url','text'),
|
|
('date','date'),
|
|
('status','text'),
|
|
|
|
]
|
|
|
|
db_obj = CustomSqliteAction(database_name=db_file_name, database_table=db_table, database_fields=db_fields)
|
|
|
|
#will create .db file if not exists
|
|
#will create table if not exists
|
|
db_obj.load()
|
|
|
|
#let's Store Some Data
|
|
#function > store_data()
|
|
#you can also use python datetime object as a date field
|
|
db_obj.store_data(url='https://m.youtube.com/video_id',date='2022-10-01')
|
|
|
|
|
|
#let's Update Some Data
|
|
#function > update_data()
|
|
db_obj.update_data(search_tuple=('id','1'), url='https://google.com/video_id')
|
|
|
|
|
|
#let's Read Some data
|
|
#function > read_data() , read_all()
|
|
|
|
#read_data()
|
|
#-> will read based on single condition or multiple condition and returns python list contaning all the data
|
|
print('Single Argument Search ...')
|
|
data = db_obj.read_data(url='https://m.youtube.com/video_id')
|
|
print(data)
|
|
|
|
print('Multiple Argument Search ...')
|
|
multi_con_data = db_obj.read_data(url='https://m.youtube.com/video_id',status='done')
|
|
print(multi_con_data)
|
|
|
|
print('Reading All Data ...')
|
|
all_data = db_obj.read_all()
|
|
print(all_data)
|
|
|
|
#let's delete Some Data
|
|
#function > delete_data()
|
|
delete_id = 1
|
|
db_obj.delete_data(id=delete_id)
|
|
db_obj.delete_data(url='https://m.youtube.com/video_id')
|
|
db_obj.delete_data(url='https://m.youtube.com/video_id',status='done')
|