-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathdbHandler.py
180 lines (147 loc) · 8.18 KB
/
dbHandler.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
import sqlite3
import pandas
#to-do: this entire file will need error handling
class DataBaseHandler:
def updateUserData(self, identifier, params, values): #params and values have to be same length
if (self.findUser(identifier).empty):
return "Couldn't find any user with that TG ID or nick!"
query = ""
sqlParams = ""
if (len(params) == len(values) and (len(params)) > 0):
for i in range(len(params)):
query += params[i] + " = "
sqlParams += params[i]
if (type(values[i]) is int or type(identifier) is long):
query += str(values[i])
elif (isinstance(values[i], basestring)):
query += '"' + values[i] + '"'
else:
print("Error: Can't update data, value is of unexpected type")
if i < (len(params) - 1):
query += ", "
sqlParams += ", "
else:
print("Error while updating SQL Data, parameters and values are not of equal length (or of length 0)")
sql = ""
sqlSearch = ""
if type(identifier) is int or type(identifier) is long:
sql = "UPDATE users SET %s WHERE ID = %d" % (query, identifier)
sqlSearch = "SELECT %s FROM users WHERE ID = %d;" % (sqlParams, identifier)
elif isinstance(identifier, basestring):
sql = "UPDATE users SET %s WHERE nick = \"%s\"" % (query, identifier)
sqlSearch = "SELECT %s FROM users WHERE nick = \"%s\";" % (sqlParams, identifier)
rows = pandas.read_sql_query(sqlSearch, self.db)
output = ""
if (len(rows) != 0):
output = "\nOld data was: \n<code>" + str(rows) + "</code>"
self.dbCursor.execute(sql)
self.db.commit()
return "Successfully updated data!" + output
def loadUserData(self, identifier, params):
if (self.findUser(identifier).empty):
return "Couldn't find any user with that TG ID or nick!"
query = ""
if len(params) > 0:
for i in range(len(params)):
query += params[i]
if i < (len(params) - 1):
query += ", "
else:
print("Error while reading SQL Data, list of search parameters is empty")
if type(identifier) is int or type(identifier) is long:
sqlSearch = "SELECT %s FROM users WHERE ID = %d;" % (query, identifier)
elif isinstance(identifier, basestring):
sqlSearch = "SELECT %s FROM users WHERE nick = \"%s\";" % (query, identifier)
self.dbCursor.execute(sqlSearch)
results = self.dbCursor.fetchall()
return results
def addUser(self, id, nick, msg = "None", subToReports = 0, admin = 0):
if (self.findUser(id).empty):
self.dbCursor.execute("INSERT INTO users (ID, nick, msg, subToReports, admin) VALUES (?, ?, ?, ?, ?);", [id, nick, msg, subToReports, admin])
self.db.commit()
return "User successfully added!"
else:
return "User with this TG ID exists already!"
def rmUser(self, identifier):
if (self.findUser(identifier).empty):
return "Couldn't find any user with that TG ID or nick!"
if type(identifier) is int or type(identifier) is long:
self.dbCursor.execute("DELETE FROM users WHERE ID = %d;" % (identifier))
elif isinstance(identifier, basestring):
self.dbCursor.execute("DELETE FROM users WHERE nick = \"%s\";" % (identifier))
self.db.commit()
return "User successfully deleted!"
def changeAdmin(self, identifier, value):
if (self.findUser(identifier).empty):
return "Couldn't find any user with that TG ID or nick!"
else:
if type(identifier) is int or type(identifier) is long:
self.dbCursor.execute("UPDATE users SET admin = %d WHERE ID = %d;" % (value, identifier))
elif isinstance(identifier, basestring):
self.dbCursor.execute("UPDATE users SET admin = %d WHERE nick = \"%s\";" % (value, identifier))
self.db.commit()
return "Admin privileges successfully changed!"
def rmUser(self, identifier):
if (self.findUser(identifier).empty):
return "Couldn't find any user with that TG ID or nick!"
if type(identifier) is int or type(identifier) is long:
self.dbCursor.execute("DELETE FROM users WHERE ID = %d;" % (identifier))
elif isinstance(identifier, basestring):
self.dbCursor.execute("DELETE FROM users WHERE nick = \"%s\";" % (identifier))
self.db.commit()
return "User successfully deleted!"
def isAdmin(self, ID):
results = self.dbCursor.execute("SELECT ID FROM users WHERE admin = 1;") #is a list of tuples, has to be accessed with [0] later
for result in results:
if (ID == int(result[0])):
return True
return False
def findUser(self, identifier):
result = pandas.DataFrame()
if type(identifier) is int or type(identifier) is long:
result = pandas.read_sql_query("SELECT ID FROM users WHERE ID = %d;" % (identifier), self.db)
elif isinstance(identifier, basestring):
result = pandas.read_sql_query("SELECT ID FROM users WHERE nick = \"%s\";" % (identifier), self.db)
return result
def showUser(self, identifier = 0):
if (type(identifier) is int or type(identifier) is long):
if (identifier == 0):
output = "<code>" + str(pandas.read_sql_query("SELECT ID, nick, subToReports, admin FROM users;", self.db)) + "</code>"
return output
else:
output = "<code>" + str(pandas.read_sql_query("SELECT ID, nick, subToReports, admin FROM users WHERE ID = %d;" % (identifier), self.db)) + "</code>"
output += "\nCustom Message:<i>" + pandas.read_sql_query("SELECT msg FROM users WHERE ID = %d;" % (identifier), self.db).to_string(header=False, index=False) + "</i>"
if (self.findUser(identifier).empty):
return "Couldn't find any user with that TG ID or nick!"
return output
elif (isinstance(identifier, basestring)):
output = "<code>" + str(pandas.read_sql_query("SELECT ID, nick, subToReports, admin FROM users WHERE nick = \"%s\";" % (identifier), self.db)) + "</code>"
output += "\nCustom Message:<i>" + pandas.read_sql_query("SELECT msg FROM users WHERE nick = \"%s\";" % (identifier), self.db).to_string(header=False, index=False) + "</i>"
if (self.findUser(identifier).empty):
return "Couldn't find any user with that TG ID or nick!"
return output
return "Error: Can't show user, identifier is of unexpected type"
def loadMetaData(self, param):
self.dbCursor.execute("SELECT %s FROM metadata;" % (param))
return self.dbCursor.fetchall()[0][0]
def loadList(self, param, table):
self.dbCursor.execute("SELECT %s FROM %s;" % (param, table))
tupleResults = self.dbCursor.fetchall()
results = []
for result in tupleResults:
results.append(result[0])
return results
def updateCastleData(self, castle, result, closeness, gold, points):
self.dbCursor.execute("UPDATE report SET battleResult = %d, battleCloseness = %d, gold = %d, points = %d WHERE castle = \"%s\"" % (result, closeness, gold, points, castle))
self.db.commit()
def loadCastleData(self, castle, param):
self.dbCursor.execute("SELECT %s from report WHERE castle = \"%s\";" % (param, castle))
return self.dbCursor.fetchall()[0]
def updateReportTimeStamp(self, time):
self.dbCursor.execute("UPDATE metadata SET dateReport = %d;" % (time))
self.db.commit()
def open(self, name):
self.db = sqlite3.connect(name)
self.dbCursor = self.db.cursor()
def close(self):
self.db.close()