-
Notifications
You must be signed in to change notification settings - Fork 14
/
crud.py
91 lines (69 loc) · 2.47 KB
/
crud.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
from time import time
from typing import Optional, Union
from lnbits.db import Database
from lnbits.helpers import urlsafe_short_hash
from loguru import logger
from .models import CreateTposData, LnurlCharge, Tpos, TposClean
db = Database("ext_tpos")
async def create_tpos(data: CreateTposData) -> Tpos:
tpos_id = urlsafe_short_hash()
tpos = Tpos(id=tpos_id, **data.dict())
await db.insert("tpos.pos", tpos)
return tpos
async def get_tpos(tpos_id: str) -> Optional[Tpos]:
return await db.fetchone(
"SELECT * FROM tpos.pos WHERE id = :id", {"id": tpos_id}, Tpos
)
async def start_lnurlcharge(tpos: Tpos) -> LnurlCharge:
now = int(time())
seconds = (
tpos.withdraw_between * 60
if tpos.withdraw_time_option != "secs"
else tpos.withdraw_between
)
last_withdraw = tpos.withdraw_time - now
assert (
last_withdraw < seconds
), f"""
Last withdraw was made too recently, please try again in
{int(seconds - (last_withdraw))} secs
"""
token = urlsafe_short_hash()
await db.execute(
"""
INSERT INTO tpos.withdraws (id, tpos_id)
VALUES (:id, :tpos_id)
""",
{"id": token, "tpos_id": tpos.id},
)
lnurlcharge = await get_lnurlcharge(token)
assert lnurlcharge, "Newly created lnurlcharge couldn't be retrieved"
return lnurlcharge
async def get_lnurlcharge(lnurlcharge_id: str) -> Optional[LnurlCharge]:
return await db.fetchone(
"SELECT * FROM tpos.withdraws WHERE id = :id",
{"id": lnurlcharge_id},
LnurlCharge,
)
async def update_lnurlcharge(charge: LnurlCharge) -> LnurlCharge:
await db.update("tpos.withdraws", charge)
return charge
async def get_clean_tpos(tpos_id: str) -> Optional[TposClean]:
return await db.fetchone(
"SELECT * FROM tpos.pos WHERE id = :id", {"id": tpos_id}, TposClean
)
async def update_tpos(tpos: Tpos) -> Tpos:
await db.update("tpos.pos", tpos)
return tpos
async def get_tposs(wallet_ids: Union[str, list[str]]) -> list[Tpos]:
if isinstance(wallet_ids, str):
wallet_ids = [wallet_ids]
q = ",".join([f"'{wallet_id}'" for wallet_id in wallet_ids])
tposs = await db.fetchall(
f"SELECT * FROM tpos.pos WHERE wallet IN ({q})", model=Tpos
)
logger.debug("tposs")
logger.debug(tposs)
return tposs
async def delete_tpos(tpos_id: str) -> None:
await db.execute("DELETE FROM tpos.pos WHERE id = :id", {"id": tpos_id})