forked from saddit/fjqndxx-v2
-
Notifications
You must be signed in to change notification settings - Fork 0
/
main.py
320 lines (267 loc) · 10.2 KB
/
main.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
import base64
import importlib
import json
import logging
import os
import time
import ssl
from functools import wraps
import requests
from exception import KnownException, SendInitException
from proxy_module.proxy_fetcher import ProxyFecher
crypt_name = "sm4"
crypt_mode = "ecb"
sess = requests.session()
sess.verify = False
sess.headers.update({
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/96.0.4664.110 Safari/537.36",
"Host": "m.fjcyl.com",
"Referer": "https://m.fjcyl.com/login"
})
ocr_util = None
encryptor = importlib.import_module(
f"crypt_module.{crypt_name}.{crypt_name}_{crypt_mode}")
send_util = {
'enable': False,
'mode': 'fail',
}
def catch_exception(func):
@wraps(func)
def catch(*args, **kwargs):
try:
return func(*args, **kwargs)
except SendInitException as se:
send_util['enable'] = False
error_exit(f'{se}', False)
except KnownException as ke:
error_exit(f'{ke}', False)
except BaseException as e:
error_exit(f"请阅读异常提醒,如果无法解决请截图日志发issue):{e}")
return catch
def init_logger():
logging.getLogger().setLevel(logging.INFO)
logging.basicConfig(format="[%(levelname)s]:%(message)s")
def error_exit(msg: str, trace=True):
send_msg(content=msg, success=False)
if trace:
logging.exception(f"异常信息: {msg}")
else:
logging.error(f"异常信息: {msg}")
exit(-1)
def error_raise(msg: str):
raise KnownException(msg)
# def get_validate_code() -> str:
# max_try = 5
# has_try = 0
# while has_try < max_try:
# resp = sess.get(
# url="https://m.fjcyl.com/validateCode?0.123123&width=58&height=19&num=4")
# try:
# # noinspection PyUnresolvedReferences
# res = ocr_util.get_result(base64.b64encode(resp.content))
# logging.info('获取验证码成功')
# return res
# except Exception as e:
# logging.warning(f'获取验证码失败,原因:{e}')
# logging.warning(f'正在重试, 次数:{has_try}')
# has_try += 1
# time.sleep(1)
# if has_try == max_try:
# error_raise("验证码解析失败,请尝试更换方式或发issue寻求帮助")
def post_study_record():
resp = sess.post(url="https://m.fjcyl.com/studyRecord")
if resp.json().get('success'):
logging.info("学习成功!")
else:
error_raise("学习失败")
def post_login(username: str, pwd: str, pub_key):
post_dict = {
'userName': encryptor.encrypt(username, pub_key),
'pwd': encryptor.encrypt(pwd, pub_key),
# 'validateCode': validate_code
}
resp = sess.post(url="https://m.fjcyl.com/mobileNologin",
data=post_dict)
if resp.status_code == requests.codes.ok:
if resp.json().get('success'):
logging.info(username[-4:] + ' login ' + resp.json().get('errmsg'))
else:
raise ConnectionError(resp.json().get('errmsg'))
else:
error_exit(f'官方服务器发生异常,错误代码:{resp.status_code},信息:{resp.text}')
def get_profile_from_config():
with open('config.json', 'r', encoding='utf-8') as config:
config_json = json.loads(config.read())
username = config_json.get('username')
pwd = config_json.get('pwd')
pub_key = config_json.get('rsaKey').get('public')
api_key = config_json.get('ocr').get('ak')
secret_key = config_json.get('ocr').get('sk')
ocr_type = config_json.get('ocr').get('type')
send_config = config_json.get('send')
accounts = config_json.get("extUsers")
if send_config is not None:
send_type = send_config.get('type')
send_key = send_config.get('key')
send_mode = send_config.get('mode')
return username, pwd, pub_key, \
api_key, secret_key, ocr_type, \
send_type, send_key, send_mode, accounts
def get_profile_from_env():
username = os.environ['username']
pwd = os.environ['password']
pub_key = os.environ['pubKey']
api_key = os.environ['ocrKey']
secret_key = os.environ['ocrSecret']
ocr_type = os.environ['ocrType']
send_type = os.environ['sendType']
send_key = os.environ['sendKey']
send_mode = os.environ['sendMode']
ext_users = os.environ['extUsers']
accounts = []
if ext_users is not None and ext_users.__len__() > 1:
for userLine in ext_users.split('\n'):
usr_split = userLine.split(" ")
account = {
"username": None,
"pwd": None
}
if len(usr_split) > 0:
account['username'] = usr_split[0]
if len(usr_split) > 1:
account['pwd'] = usr_split[1]
accounts.append(account)
return username, pwd, pub_key, \
api_key, secret_key, ocr_type, \
send_type, send_key, send_mode, accounts
def login(username, pwd, pub_key):
max_try = 5
has_try = 0
logging.info(f"正在登录尾号{username[-4:]}")
while has_try < max_try:
# get validate code 经测试不需要验证码
# code = get_validate_code()
# do login
try:
post_login(username, pwd, pub_key)
break
except ConnectionError as e:
logging.error(f'尾号{username[-4:]}登录失败,原因:{e}')
logging.info(f'尝试重新登录,重试次数{has_try}')
has_try += 1
time.sleep(1)
if has_try == max_try:
error_raise(f"尾号{username[-4:]}尝试登录失败")
# def init_ocr(ocr_type: str, ak: str, sk: str):
# global ocr_util
# if ocr_type is None or ocr_type == '':
# ocr_type = "baidu_image"
# try:
# ocr_util = importlib.import_module(
# f"ocr_module.{ocr_type}.{ocr_type}_ocr")
# except ModuleNotFoundError:
# error_exit("ocr类型不存在,请更换类型")
# if ocr_util.is_need_keys():
# ocr_util.set_keys(ak, sk)
# logging.info(f"使用 OCR {ocr_type}")
def init_sender(send_type, send_key, send_mode):
if send_type is None or send_type == '':
return
if send_key is None or send_key == '':
raise SendInitException('缺少配置信息: send_key')
else:
try:
send_util['sender'] = importlib.import_module(
f"send_module.{send_type}.sender")
except ModuleNotFoundError:
raise SendInitException("消息推送类型不存在,请更换类型")
send_util['enable'] = True
send_util['sender'].set_key(send_key)
if send_mode is not None and send_mode != "":
send_util['mode'] = send_mode
def send_msg(content, success=True):
if not send_util['enable']:
return
if send_util['mode'] == 'both' \
or (send_util['mode'] == 'fail' and not success) \
or (send_util['mode'] == 'success' and success):
res = send_util['sender'].send(title="青年大学习打卡",
content=f"**状态:** {'成功' if success else '失败'}\n\n"
f"**信息:** {content}")
if not res['success']:
logging.warning(f"消息推送失败,原因:{res['message']}")
else:
logging.info(f"消息推送成功")
def multi_study(accounts, pub_key):
logging.info(f"开始多人打卡,人数{len(accounts)}")
push_msg = ""
all_success = True
for account in accounts:
if account['username'] is None or account['pwd'] is None:
logging.warning("多人打卡配置存在错误,将跳过部分用户,请检查配置的用户名密码格式")
continue
try:
login(account['username'], account['pwd'], pub_key)
post_study_record()
push_msg += f"尾号{account['username'][-4:]}打卡成功\n"
except KnownException as e:
push_msg += f"尾号{account['username'][-4:]}失败:{e}\n"
all_success = False
push_msg += "全部打卡成功" if all_success else "部分打卡失败"
send_msg(push_msg, all_success)
def single_study(username, password, pub_key):
logging.info("开始单人打卡")
# do login
login(username, password, pub_key)
# do study
post_study_record()
# send success message
send_msg("打卡成功")
@catch_exception
def run(use_config: bool):
logging.info("自动学习开始")
# get default config
username, pwd, pub_key, \
api_key, secret_key, ocr_type, \
send_type, send_key, send_mode, \
accounts = get_profile_from_config() if use_config else get_profile_from_env()
# init ocr module 不需要 ocr 了
# init_ocr(ocr_type, api_key, secret_key)
# init sender
init_sender(send_type, send_key, send_mode)
# study proc
if accounts is not None and len(accounts) > 0:
if pwd is not None and username is not None and pwd != "" and username != "":
accounts.append({"username": username, "pwd": pwd})
multi_study(accounts, pub_key)
else:
single_study(username, pwd, pub_key)
def init_proxy():
logging.info("正在尝试使用代理IP")
proxy = ProxyFecher()
while not proxy.empty():
ip = proxy.random_pop()
sess.proxies = {'https': f"http://{ip}"}
try:
try:
logging.info(f"正在测试 http://{ip}")
sess.get("https://m.fjcyl.com")
except BaseException:
logging.info(f"正在测试 https://{ip}")
sess.proxies = {'https': f"https://{ip}"}
sess.get("https://m.fjcyl.com")
logging.info(f"测试成功,使用{ip}代理请求")
return
except BaseException:
logging.info(f"{ip} 不可用")
error_exit("找不到可用代理IP", False)
def start_with_workflow():
init_logger()
logging.info("你正在使用GitHubAction,请确保secret已经配置")
init_proxy()
run(False)
if __name__ == '__main__':
init_logger()
logging.info("你正在使用本地服务,请确保填写了配置文件")
init_proxy()
run(True)