-
Notifications
You must be signed in to change notification settings - Fork 7
/
Copy pathauto_api.py
78 lines (61 loc) · 2.48 KB
/
auto_api.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
from config import api_list
from tools import create_redis_key, get, get_localtime, post, timeDelay
import redis
class AutoApi:
def __init__(self, conf) -> None:
self.client_id = conf.get("client_id")
self.secret = conf.get("secret")
self.redirect_uri = conf.get("redirect_uri")
self.redis_key = create_redis_key(conf.get("redis_key"))
self.refresh_token = conf.get("refresh_token")
self.redis = redis.StrictRedis(
host=conf.get("redis_host"),
port=conf.get("redis_port"),
password=conf.get("redis_password"),
)
def test_key_exists(self):
return self.redis.exists(self.redis_key)
def get_token(self):
url = "https://login.microsoftonline.com/common/oauth2/v2.0/token"
headers = {"Content-Type": "application/x-www-form-urlencoded"}
data = {
"grant_type": "refresh_token",
"refresh_token": self.refresh_token,
"client_id": self.client_id,
"client_secret": self.secret,
"redirect_uri": self.redirect_uri,
}
try:
rep = post(url, data=data, headers=headers)
if "error" in rep:
raise Exception(rep["error_description"])
else:
return [rep.get("access_token"), rep.get("refresh_token")]
except Exception as ex:
print(f"[Error] {ex}")
def start(self):
if self.test_key_exists() == 0:
if self.refresh_token == "":
print("Refresh token is empty. :(")
return
self.redis.set(self.redis_key, self.refresh_token)
self.refresh_token = self.redis.get(self.redis_key) # overlay config
tokens = self.get_token()
self.redis.set(self.redis_key, tokens[1]) # update token
headers = {
"Authorization": tokens[0],
"Content-Type": "application/json",
}
for i in range(3):
print((f"开始第{i + 1}次测试").center(20, "#"))
try:
for idx, API in enumerate(api_list):
if get(API, headers) == 200:
print(f"第{idx + 1}次调用成功")
else:
print(f"第{idx + 1}次调用失败")
timeDelay(1, 4)
print(f"[Info] time: {get_localtime()}")
except Exception as ex:
print(f"[Error] {ex}")
timeDelay(1, 4)