-
Notifications
You must be signed in to change notification settings - Fork 14
/
solvers.py
134 lines (111 loc) · 4.81 KB
/
solvers.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
import re
import time
from urllib.parse import quote
from bs4 import BeautifulSoup
from settings import settings
class SolverTurnsile:
def __init__(self, puser):
self.puser = puser
self.id = -1
self.turnsile_response = None
self.request_time = None
def on_before_request(self, id) -> bool:
self.id = id
return True
def solve(self, contest_soup: BeautifulSoup) -> bool:
time.sleep(settings.solve_time)
contest_captcha = contest_soup.find("div", class_="ContestCaptcha")
if contest_captcha is None:
self.puser.logger.warning("Couldn't get ContestCaptcha. Lag or contest is over?")
return False
message_content = contest_soup.find("div", {"class": "messageContent"})
# TODO: this looks ugly.
contest_thread_block = message_content.find("div", {"class": "contestThreadBlock"})
if contest_thread_block is not None:
for contest_info in contest_thread_block.find_all("div", {"class": "marginBlock"}, recursive=False):
# https://youtu.be/FBdFhgWYEjM
if re.match("\s*Приз:\s+Слив фотографий", contest_info.text):
settings._expire_blacklist[self.id] = time.time() + 30000000000
self.puser.logger.notice("saved your ass from a useless contest")
return False
self.request_time = contest_captcha.find("input", {"name": "request_time"}, recursive=False)
if self.request_time is None:
self.puser.logger.warning("request_time is missing.")
return False
self.turnsile_response = self.request_turnsile_solve()
if self.turnsile_response is None:
return False
return True
def request_turnsile_solve(self):
params = {
'key': settings.anti_captcha_key,
'method': "turnstile",
'sitekey': settings.site_key,
'pageurl': settings.lolz_url + "threads/" + str(self.id) + "/",
'json': 1
}
if settings.send_referral_to_creator:
params["softguru"] = 109978
submitresp = self.puser.makerequest("GET", "https://api.captcha.guru/in.php", params=params)
if submitresp is None:
self.puser.logger.warning("couldn't send turnsile solve request")
return None
submit = submitresp.json()
self.puser.logger.debug(submit)
if submit["status"] == 0:
self.puser.logger.warning("turnsile captcha submit was unsuccessful")
return None
while True:
time.sleep(5)
resp = self.puser.makerequest(
"GET",
"https://api.captcha.guru/res.php",
params={
'key': settings.anti_captcha_key,
'action': "get",
'id': submit["request"],
'json': 1
}
)
if resp is None:
self.puser.logger.warning("turnsile solve fetch failed")
continue
answer = resp.json()
self.puser.logger.debug(answer)
if answer["status"] == 0:
if answer["request"] == "CAPCHA_NOT_READY":
continue
if answer["request"] == "ERROR_CAPTCHA_UNSOLVABLE":
self.puser.logger.warning("service failed to solve captcha")
return None
if answer["status"] == 1:
return answer["request"]
else:
raise RuntimeError("unknown response from captcha solver")
def participate(self, csrf: str):
if self.turnsile_response is None or self.request_time is None:
raise RuntimeError("turnsile_response or request_time is none when participating")
response = self.puser.makerequest(
"POST",
settings.lolz_url + "threads/" + str(self.id) + "/participate",
params={"cf-turnstile-response": self.turnsile_response},
data={
'request_time': str(self.request_time),
'cf-turnstile-response': self.turnsile_response,
'_xfRequestUri': quote("/threads/" + str(self.id) + "/"),
'_xfNoRedirect': 1,
'_xfToken': csrf,
'_xfResponseType': "json",
},
timeout=12.05,
retries=3,
checkforjs=True
)
if response is None:
return None
return response.json()
def on_failure(self, response):
self.puser.logger.error("%d didn't participate (why lol?): %s", self.id, str(response))
settings._expire_blacklist[self.id] = time.time() + 300000
def on_success(self, response):
self.puser.logger.debug("%s", str(response))