-
Notifications
You must be signed in to change notification settings - Fork 12
/
Copy pathmanage_db.py
116 lines (88 loc) · 3.16 KB
/
manage_db.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
import datetime
import json
from sqlalchemy import delete, insert, select, update
from models import engine, User
from settings import QuestionThreshold as qt
# user_name column
def create_new_user(user_name: str) -> None:
with engine.connect() as conn:
conn.execute(insert(User).values(
user_name=user_name,
interviews_duration=0,
progress=_get_zero_progress()
)
)
conn.commit()
def get_user_names() -> list[str]:
names = get_users_list()
return [person for name in names for person in name]
def get_users_list() -> list[tuple[str]]:
with engine.connect() as conn:
result = conn.execute(select(User.user_name))
conn.commit()
return result.all()
def delete_this_user(user_name: str) -> None:
with engine.connect() as conn:
conn.execute(delete(User).where(User.user_name == user_name))
conn.commit()
# last_enter_date Column
def get_last_enter_date(user_name: str) -> datetime.datetime:
with engine.connect() as conn:
result = conn.execute(
select(User.last_enter_date).where(
User.user_name == user_name)).first()
conn.commit()
return result[0]
def update_last_enter_date(user_name: str, date) -> None:
with engine.connect() as conn:
conn.execute(
update(User).where(
User.user_name == user_name).values(last_enter_date=date))
conn.commit()
# interview_duration Column
def get_user_interview_duration(user_name: str) -> int:
with engine.connect() as conn:
interview_duration = conn.execute(
select(User.interviews_duration
).where(User.user_name == user_name)
).first()
conn.commit()
return int(interview_duration[0])
def update_interview_duration(user_name: str, duration) -> None:
with engine.connect() as conn:
conn.execute(
update(User).where(
User.user_name == user_name).values(
interviews_duration=duration
)
)
conn.commit()
# progress Column
def get_user_progress(
user_name: str) -> dict[int, bool]:
progress = json.loads(load_user_progress(user_name))
return {
int(question_number): is_rigth
for question_number, is_rigth in progress.items()
}
def load_user_progress(user_name: str) -> str:
with engine.connect() as conn:
result = conn.execute(select(User.progress).where(
User.user_name == user_name))
conn.commit()
return result.all()[0][0]
def update_user_progress(user_name: str, progress: dict) -> None:
with engine.connect() as conn:
conn.execute(
update(User).where(
User.user_name == user_name).values(
progress=json.dumps(progress)))
conn.commit()
# Support functions
def _get_zero_progress() -> str:
return json.dumps(_create_zero_progress())
def _create_zero_progress() -> dict[int, bool]:
return {
question_number: False for question_number
in range(qt.BASIC_FIRST_QUESTION, qt.SQL_LAST_QUESTION + 1)
}