-
Notifications
You must be signed in to change notification settings - Fork 1
/
mongo.py
142 lines (119 loc) · 4.19 KB
/
mongo.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
import motor.motor_asyncio
from settings import MONGO_URL
import asyncio
from motor.motor_asyncio import AsyncIOMotorClient # type: ignore
from contextlib import asynccontextmanager
from typing import Any, AsyncGenerator, Dict, Optional
from aiogram import Bot
from aiogram.fsm.state import State
from aiogram.fsm.storage.base import BaseStorage, StateType, StorageKey, DEFAULT_DESTINY
mongo_client = motor.motor_asyncio.AsyncIOMotorClient(MONGO_URL)
db = mongo_client.omi_telebot
users_collection = db.users
STATE = "aiogram_state"
DATA = "aiogram_data"
BUCKET = "aiogram_bucket"
COLLECTIONS = (STATE, DATA, BUCKET)
class MongoStorage(BaseStorage):
"""
Mongo storage required :code:`motor` package installed (:code:`pip install motor`)
"""
def __init__(
self,
mongo: AsyncIOMotorClient,
db_name: str = "aiogram_fsm",
with_bot_id: bool = True,
with_destiny: bool = True,
) -> None:
"""
:param mongo: Instance of mongo connection
:param with_bot_id: include Bot id in the database
:param with_destiny: include destiny in the database
"""
self._mongo = mongo
self._db = mongo.get_database(db_name)
self._with_bot_id = with_bot_id
self._with_destiny = with_destiny
self._lock = asyncio.Lock()
@classmethod
def from_url(
cls,
url: str,
db_name: str = "aiogram_fsm",
with_bot_id: bool = True,
with_destiny: bool = True,
) -> "MongoStorage":
"""
Create an instance of :class:`MongoStorage` with specifying the connection string
:param url: for example :code:`mongodb://user:password@host:port`
:param db_name: name of database to store aiogram data`
:param with_bot_id: include Bot id in the database
:param with_destiny: include destiny in the database
"""
return cls(
mongo=AsyncIOMotorClient(url),
db_name=db_name,
with_bot_id=with_bot_id,
with_destiny=with_destiny,
)
def _get_db_filter(self, key: StorageKey) -> Dict[str, Any]:
db_filter: Dict[str, Any] = {"chat": key.chat_id, "user": key.user_id}
if self._with_bot_id:
db_filter["bot_id"] = key.bot_id
if self._with_destiny:
db_filter["destiny"] = key.destiny
elif key.destiny != DEFAULT_DESTINY:
raise ValueError(
"Mongo storage is not configured to use key destiny other the default.\n"
"\n"
"Probably, you should set `with_destiny=True` in for MongoStorage.\n"
"E.g: `MongoStorage(mongo_client, ..., with_destiny=True)`"
)
return db_filter
async def close(self) -> None:
self._mongo.close()
@asynccontextmanager
async def lock(
self,
bot: Bot,
key: StorageKey,
) -> AsyncGenerator[None, None]:
async with self._lock:
yield None
async def set_state(
self,
key: StorageKey,
state: StateType = None,
) -> None:
if state is None:
await self._db[STATE].delete_one(filter=self._get_db_filter(key))
else:
await self._db[STATE].update_one(
filter=self._get_db_filter(key),
update={"$set": {"state": state.state if isinstance(state, State) else state}},
upsert=True,
)
async def get_state(
self,
key: StorageKey,
) -> Optional[str]:
result = await self._db[STATE].find_one(filter=self._get_db_filter(key))
return result.get("state") if result else None
async def set_data(
self,
key: StorageKey,
data: Dict[str, Any],
) -> None:
await self._db[DATA].update_one(
filter=self._get_db_filter(key),
update={"$set": {"data": data}},
upsert=True,
)
async def get_data(
self,
key: StorageKey,
) -> Dict[str, Any]:
result: Optional[Dict[str, Dict[str, Any]]] = await self._db[DATA].find_one(
filter=self._get_db_filter(key)
)
return result.get("data") or {} if result else {}