forked from FrightenedFox/psb-project-sql-injection
-
Notifications
You must be signed in to change notification settings - Fork 0
/
db.py
164 lines (144 loc) · 6.16 KB
/
db.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
import configparser
import logging
import pandas as pd
import psycopg2
import sqlalchemy
config = configparser.ConfigParser()
config.read('config.ini')
class DellStoreDB:
""" Makes the communication with the database easier."""
def __init__(self):
self.conn = None
self.is_connected = False
self.sqlalchemy_engine: sqlalchemy.engine.base.Engine | None = None
def connect(self, **kwargs):
"""Connect to the PostgreSQL database server."""
try:
logging.info("Connecting to the PostgreSQL database...")
self.conn = psycopg2.connect(**kwargs)
self.sqlalchemy_engine = sqlalchemy.create_engine(
'postgresql+psycopg2://',
creator=lambda: self.conn)
# Display PostgreSQL version
self.conn.autocommit = True
cur = self.conn.cursor()
cur.execute('SELECT version()')
logging.info(f"PostgreSQL version:\t{cur.fetchone()}")
cur.close()
except (Exception, psycopg2.DatabaseError):
logging.exception('')
if self.conn is not None:
self.is_connected = True
def disconnect(self):
"""Disconnect from the PostgreSQL database server."""
if self.conn is not None:
self.conn.close()
self.sqlalchemy_engine.dispose()
self.conn = None
logging.info("Database connection closed.")
self.is_connected = False
else:
# executes when there was no connection
logging.warning("Database was asked to be closed, but there was no connection.")
logging.warning(f"self.is_connected set to False (before it was {self.is_connected}).")
self.is_connected = False
def create_db(self):
cur = self.conn.cursor()
cur.execute("CREATE DATABASE dellstore2 OWNER sqlinjection TABLESPACE dbspace;")
logging.info("Database dellstore2 has been created.")
cur.close()
def fill_db(self):
cur = self.conn.cursor()
logging.info("Starting to populate dellstore2 db tables with data...")
try:
with open("psb_project/dellstore2/dellstore2-normal-1.0.sql", "r") as sql_script:
cur.execute(sql_script.read())
except Exception as e:
logging.error(str(e))
return False, str(e)
else:
logging.info("Tables where populated successfully.")
cur.close()
return True, "Tabele zostały pomyślnie wypełnione danymi."
def drop_tables(self):
cur = self.conn.cursor()
cur.execute("DROP TABLE IF EXISTS public.categories ;"
"DROP TABLE IF EXISTS public.cust_hist; "
"DROP TABLE IF EXISTS public.inventory; "
"DROP TABLE IF EXISTS public.orderlines; "
"DROP TABLE IF EXISTS public.orders; "
"DROP TABLE IF EXISTS public.customers; "
"DROP TABLE IF EXISTS public.products; "
"DROP TABLE IF EXISTS public.reorder; "
"DROP FUNCTION IF EXISTS public.new_customer;")
cur.close()
def drop_db(self):
cur = self.conn.cursor()
cur.execute("DROP DATABASE IF EXISTS dellstore2;")
logging.info("Database dellstore2 has been dropped.")
cur.close()
def get_user(self, username: str, password: str):
"""User login query (with injection)
:param username:
:param password:
:return: (bool, bool, str) : Logged in?, With error?, Error message
"""
ret_text = ""
cur = self.conn.cursor()
try:
cur.execute(f"SELECT * "
f"FROM dellstore2.public.customers "
f"WHERE username='{username}' AND password='{password}';")
except Exception as e:
ret_text = str(e)
logging.error(str(e))
return False, True, ret_text
else:
ans = cur.fetchone()
cur.close()
logging.debug(f"Login as {username=} {password=}")
if ans is None:
return False, False, ret_text
else:
return True, False, ret_text
def insert_user(self, username: str, password: str, firstname: str, lastname: str):
ret_text = ""
cur = self.conn.cursor()
try:
cur.execute(f"INSERT INTO dellstore2.public.customers (firstname, lastname, username, password) "
f"VALUES ('{firstname}', '{lastname}', '{username}', '{password}') "
f"on CONFLICT do NOTHING "
f"RETURNING (xmax = 0) AS inserted;")
except Exception as e:
ret_text = str(e)
logging.error(str(e))
return False, ret_text
else:
ans = cur.fetchone()
cur.close()
logging.debug(f"Registered user:{firstname=} {lastname=} {username=} {password=}")
if ans is None:
return False, ret_text
else:
return True, ret_text
def get_products(self, user_query: str):
ret_text = ""
sql_query = sqlalchemy.text(f"SELECT * "
f"FROM dellstore2.public.products "
f"WHERE (actor LIKE upper('%{user_query}%')) "
f" OR (title LIKE upper('%{user_query}%'));")
try:
df = pd.read_sql(sql_query, self.sqlalchemy_engine)
except Exception as e:
ret_text = str(e)
logging.error(str(e))
return pd.DataFrame(), True, ret_text
else:
logging.debug(f"Search result: {df.shape=}")
return df, False, ret_text
if __name__ == "__main__":
logging.basicConfig(level=logging.DEBUG)
db = DellStoreDB()
db.connect(**config["postgresql-dellstore2"])
print(db.insert_user("vmorskyi", "123456", "vitalii", "morskyi"))
db.disconnect()