Skip to content

Commit

Permalink
update ban detection; fix cli relogin command to relogin only selecte…
Browse files Browse the repository at this point in the history
…d accounts
  • Loading branch information
vladkens committed Jan 5, 2024
1 parent 5fc84d8 commit d78e33d
Show file tree
Hide file tree
Showing 6 changed files with 75 additions and 52 deletions.
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ classifiers = [
]
dependencies = [
"aiosqlite>=0.17.0",
"fake-useragent>=1.3.0",
"fake-useragent>=1.4.0",
"httpx>=0.24.0",
"loguru>=0.7.0",
]
Expand Down
22 changes: 15 additions & 7 deletions twscrape/accounts_pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -142,10 +142,14 @@ async def login(self, account: Account, email_first: bool = False):
finally:
await self.save(account)

async def login_all(self, email_first=False):
qs = "SELECT * FROM accounts WHERE active = false AND error_msg IS NULL"
rs = await fetchall(self._db_file, qs)
async def login_all(self, email_first=False, usernames: list[str] | None = None):
if usernames is None:
qs = "SELECT * FROM accounts WHERE active = false AND error_msg IS NULL"
else:
us = ",".join([f'"{x}"' for x in usernames])
qs = f"SELECT * FROM accounts WHERE username IN ({us})"

rs = await fetchall(self._db_file, qs)
accounts = [Account.from_rs(rs) for rs in rs]
# await asyncio.gather(*[login(x) for x in self.accounts])

Expand Down Expand Up @@ -176,7 +180,7 @@ async def relogin(self, usernames: str | list[str], email_first=False):
"""

await execute(self._db_file, qs)
await self.login_all(email_first=email_first)
await self.login_all(email_first=email_first, usernames=usernames)

async def relogin_failed(self, email_first=False):
qs = "SELECT username FROM accounts WHERE active = false AND error_msg IS NOT NULL"
Expand Down Expand Up @@ -248,13 +252,17 @@ async def get_for_queue(self, queue: str):

return Account.from_rs(rs) if rs else None

async def get_for_queue_or_wait(self, queue: str) -> Account:
async def get_for_queue_or_wait(self, queue: str) -> Account | None:
msg_shown = False
while True:
account = await self.get_for_queue(queue)
if not account:
if not msg_shown:
nat = await self.next_available_at(queue)
if not nat:
logger.warning("No active accounts. Stopping...")
return None

msg = f'No account available for queue "{queue}". Next available at {nat}'
logger.info(msg)
msg_shown = True
Expand Down Expand Up @@ -283,9 +291,9 @@ async def next_available_at(self, queue: str):
at_local = datetime.now() + (trg - now)
return at_local.strftime("%H:%M:%S")

return "none"
return None

async def mark_banned(self, username: str, error_msg: str):
async def mark_inactive(self, username: str, error_msg: str | None):
qs = """
UPDATE accounts SET active = false, error_msg = :error_msg
WHERE username = :username
Expand Down
1 change: 1 addition & 0 deletions twscrape/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ async def main(args):

if args.command == "add_accounts":
await pool.load_from_file(args.file_path, args.line_format)
print("\nNow run:\ntwscrape login_accounts")
return

if args.command == "del_accounts":
Expand Down
2 changes: 1 addition & 1 deletion twscrape/imap.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@

_env = dict(os.environ)

LOGIN_CODE_TIMEOUT = int_or(_env, "LOGIN_CODE_TIMEOUT") or 40
LOGIN_CODE_TIMEOUT = int_or(_env, "LOGIN_CODE_TIMEOUT") or 30


class EmailLoginError(Exception):
Expand Down
2 changes: 1 addition & 1 deletion twscrape/logger.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@

from loguru import logger

_LEVELS = Literal["DEBUG", "INFO", "WARNING", "ERROR", "CRITICAL"]
_LEVELS = Literal["TRACE", "DEBUG", "INFO", "WARNING", "ERROR", "CRITICAL"]
_LOG_LEVEL: _LEVELS = "INFO"


Expand Down
98 changes: 56 additions & 42 deletions twscrape/queue_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,7 @@ def __init__(self, acc: Account, clt: httpx.AsyncClient):
self.req_count = 0


class RateLimitError(Exception):
pass


class BannedError(Exception):
class HandledError(Exception):
pass


Expand Down Expand Up @@ -82,16 +78,16 @@ async def __aenter__(self):
async def __aexit__(self, exc_type, exc_val, exc_tb):
await self._close_ctx()

async def _close_ctx(self, reset_at=-1, banned=False, msg=""):
async def _close_ctx(self, reset_at=-1, inactive=False, msg: str | None = None):
if self.ctx is None:
return

ctx, self.ctx, self.req_count = self.ctx, None, 0
username = ctx.acc.username
await ctx.clt.aclose()

if banned:
await self.pool.mark_banned(username, msg)
if inactive:
await self.pool.mark_inactive(username, msg)
return

if reset_at > 0:
Expand Down Expand Up @@ -123,60 +119,75 @@ async def _check_rep(self, rep: httpx.Response) -> None:
except json.JSONDecodeError:
res: Any = {"_raw": rep.text}

limit_remaining = int(rep.headers.get("x-rate-limit-remaining", -1))
limit_reset = int(rep.headers.get("x-rate-limit-reset", -1))
# limit_max = int(rep.headers.get("x-rate-limit-limit", -1))

err_msg = "OK"
if "errors" in res:
err_msg = set([f'({x.get("code", -1)}) {x["message"]}' for x in res["errors"]])
err_msg = "; ".join(list(err_msg))

if self.debug:
fn = logger.debug if rep.status_code == 200 else logger.warning
fn(f"{rep.status_code:3d} - {req_id(rep)} - {err_msg}")
log_msg = f"{rep.status_code:3d} - {req_id(rep)} - {err_msg}"
print(log_msg)
logger.trace(log_msg)

# need to add some features in api.py
# for dev: need to add some features in api.py
if err_msg.startswith("(336) The following features cannot be null"):
logger.error(f"Update required: {err_msg}")
logger.error(f"[DEV] Update required: {err_msg}")
exit(1)

# general api rate limit
if int(rep.headers.get("x-rate-limit-remaining", -1)) == 0:
await self._close_ctx(int(rep.headers.get("x-rate-limit-reset", -1)))
raise RateLimitError(err_msg)
if limit_remaining == 0 and limit_reset > 0:
logger.debug(f"Rate limited: {log_msg}")
await self._close_ctx(limit_reset)
raise HandledError()

# possible new limits for tweets view per account
if err_msg.startswith("(88) Rate limit exceeded") or rep.status_code == 429:
await self._close_ctx(utc.ts() + 60 * 60 * 4) # lock for 4 hours
raise RateLimitError(err_msg)
# no way to check is account banned in direct way, but this check should work
if err_msg.startswith("(88) Rate limit exceeded") and limit_remaining > 0:
logger.warning(f"Ban detected: {log_msg}")
await self._close_ctx(-1, inactive=True, msg=err_msg)
raise HandledError()

if err_msg.startswith("(326) Authorization: Denied by access control"):
await self._close_ctx(-1, banned=True, msg=err_msg)
raise BannedError(err_msg)

# Something from twitter side, abort request so it doesn't hang
# https://github.com/vladkens/twscrape/pull/80
if err_msg.startswith("(131) Dependency: Internal error."):
logger.warning(f"Ban detected: {log_msg}")
await self._close_ctx(-1, inactive=True, msg=err_msg)
raise HandledError()

if err_msg.startswith("(32) Could not authenticate you"):
logger.warning(f"Session expired or banned: {log_msg}")
await self._close_ctx(-1, inactive=True, msg=err_msg)
raise HandledError()

if err_msg == "OK" and rep.status_code == 403:
logger.warning(f"Session expired or banned: {log_msg}")
await self._close_ctx(-1, inactive=True, msg=None)
raise HandledError()

# something from twitter side - abort all queries, see: https://github.com/vladkens/twscrape/pull/80
if err_msg.startswith("(131) Dependency: Internal error"):
logger.warning(f"Dependency error (request skipped): {err_msg}")
raise AbortReqError()

# possible banned by old api flow
if rep.status_code in (401, 403):
await self._close_ctx(utc.ts() + 60 * 60 * 12) # lock for 12 hours
raise RateLimitError(err_msg)

# content not found
if rep.status_code == 200 and "_Missing: No status found with that ID." in err_msg:
if rep.status_code == 200 and "_Missing: No status found with that ID" in err_msg:
return # ignore this error

# Something from twitter side, just ignore it
# https://github.com/vladkens/twscrape/pull/95
# something from twitter side - just ignore it, see: https://github.com/vladkens/twscrape/pull/95
if rep.status_code == 200 and "Authorization" in err_msg:
logger.warning(f"Authorization unknown error: {err_msg}")
logger.warning(f"Authorization unknown error: {log_msg}")
return

if err_msg != "OK":
logger.warning(f"API unknown error: {err_msg}")
logger.warning(f"API unknown error: {log_msg}")
return # ignore any other unknown errors

rep.raise_for_status()
try:
rep.raise_for_status()
except httpx.HTTPStatusError:
logger.error(f"Unhandled API response code: {log_msg}")
await self._close_ctx(utc.ts() + 60 * 15) # 15 minutes
raise HandledError()

async def get(self, url: str, params: ReqParams = None):
return await self.req("GET", url, params=params)
Expand All @@ -185,6 +196,8 @@ async def req(self, method: str, url: str, params: ReqParams = None) -> httpx.Re
retry_count = 0
while True:
ctx = await self._get_ctx()
if ctx is None:
return None

try:
rep = await ctx.clt.request(method, url, params=params)
Expand All @@ -194,16 +207,17 @@ async def req(self, method: str, url: str, params: ReqParams = None) -> httpx.Re
ctx.req_count += 1 # count only successful
retry_count = 0
return rep
except (RateLimitError, BannedError):
# already handled
continue
except AbortReqError:
# abort all queries
return
except HandledError:
# retry with new account
continue
except (httpx.ReadTimeout, httpx.ProxyError):
# http transport failed, just retry
# http transport failed, just retry with same account
continue
except Exception as e:
retry_count += 1
if retry_count >= 3:
logger.warning(f"Unknown error {type(e)}: {e}")
logger.warning(f"Unhandled error {type(e)}: {e}")
await self._close_ctx(utc.ts() + 60 * 15) # 15 minutes

0 comments on commit d78e33d

Please sign in to comment.