-
Notifications
You must be signed in to change notification settings - Fork 174
/
mgr.py
59 lines (47 loc) · 1.6 KB
/
mgr.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
# -*- coding: utf-8 -*-
import ntwork
import requests
from typing import Dict, Union
from ntwork.utils.singleton import Singleton
from utils import generate_guid
from exception import ClientNotExists
class ClientWeWork(ntwork.WeWork):
guid: str = ""
class ClientManager(metaclass=Singleton):
__client_map: Dict[str, ntwork.WeWork] = {}
callback_url: str = ""
def new_guid(self):
"""
生成新的guid
"""
while True:
guid = generate_guid("wework")
if guid not in self.__client_map:
return guid
def create_client(self):
guid = self.new_guid()
wework = ClientWeWork()
wework.guid = guid
self.__client_map[guid] = wework
# 注册回调
wework.on(ntwork.MT_ALL, self.__on_callback)
wework.on(ntwork.MT_RECV_WEWORK_QUIT_MSG, self.__on_quit_callback)
return guid
def get_client(self, guid: str) -> Union[None, ntwork.WeWork]:
client = self.__client_map.get(guid, None)
if client is None:
raise ClientNotExists(guid)
return client
def remove_client(self, guid):
if guid in self.__client_map:
del self.__client_map[guid]
def __on_callback(self, wework, message):
if not self.callback_url:
return
client_message = {
"guid": wework.guid,
"message": message
}
requests.post(self.callback_url, json=client_message)
def __on_quit_callback(self, wework):
self.__on_callback(wework, {"type": ntwork.MT_RECV_WEWORK_QUIT_MSG, "data": {}})