From d78e33d2ccbbd48325d67e6f6d70123a72042dde Mon Sep 17 00:00:00 2001 From: Vlad Pronsky Date: Fri, 5 Jan 2024 17:57:43 +0200 Subject: [PATCH] update ban detection; fix cli relogin command to relogin only selected accounts --- pyproject.toml | 2 +- twscrape/accounts_pool.py | 22 ++++++--- twscrape/cli.py | 1 + twscrape/imap.py | 2 +- twscrape/logger.py | 2 +- twscrape/queue_client.py | 98 ++++++++++++++++++++++----------------- 6 files changed, 75 insertions(+), 52 deletions(-) diff --git a/pyproject.toml b/pyproject.toml index 2c52ba1..f03a15f 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -20,7 +20,7 @@ classifiers = [ ] dependencies = [ "aiosqlite>=0.17.0", - "fake-useragent>=1.3.0", + "fake-useragent>=1.4.0", "httpx>=0.24.0", "loguru>=0.7.0", ] diff --git a/twscrape/accounts_pool.py b/twscrape/accounts_pool.py index 8433c29..ac176fd 100644 --- a/twscrape/accounts_pool.py +++ b/twscrape/accounts_pool.py @@ -142,10 +142,14 @@ async def login(self, account: Account, email_first: bool = False): finally: await self.save(account) - async def login_all(self, email_first=False): - qs = "SELECT * FROM accounts WHERE active = false AND error_msg IS NULL" - rs = await fetchall(self._db_file, qs) + async def login_all(self, email_first=False, usernames: list[str] | None = None): + if usernames is None: + qs = "SELECT * FROM accounts WHERE active = false AND error_msg IS NULL" + else: + us = ",".join([f'"{x}"' for x in usernames]) + qs = f"SELECT * FROM accounts WHERE username IN ({us})" + rs = await fetchall(self._db_file, qs) accounts = [Account.from_rs(rs) for rs in rs] # await asyncio.gather(*[login(x) for x in self.accounts]) @@ -176,7 +180,7 @@ async def relogin(self, usernames: str | list[str], email_first=False): """ await execute(self._db_file, qs) - await self.login_all(email_first=email_first) + await self.login_all(email_first=email_first, usernames=usernames) async def relogin_failed(self, email_first=False): qs = "SELECT username FROM accounts WHERE active = false AND error_msg IS NOT NULL" @@ -248,13 +252,17 @@ async def get_for_queue(self, queue: str): return Account.from_rs(rs) if rs else None - async def get_for_queue_or_wait(self, queue: str) -> Account: + async def get_for_queue_or_wait(self, queue: str) -> Account | None: msg_shown = False while True: account = await self.get_for_queue(queue) if not account: if not msg_shown: nat = await self.next_available_at(queue) + if not nat: + logger.warning("No active accounts. Stopping...") + return None + msg = f'No account available for queue "{queue}". Next available at {nat}' logger.info(msg) msg_shown = True @@ -283,9 +291,9 @@ async def next_available_at(self, queue: str): at_local = datetime.now() + (trg - now) return at_local.strftime("%H:%M:%S") - return "none" + return None - async def mark_banned(self, username: str, error_msg: str): + async def mark_inactive(self, username: str, error_msg: str | None): qs = """ UPDATE accounts SET active = false, error_msg = :error_msg WHERE username = :username diff --git a/twscrape/cli.py b/twscrape/cli.py index ce6ab9b..b841ca7 100644 --- a/twscrape/cli.py +++ b/twscrape/cli.py @@ -73,6 +73,7 @@ async def main(args): if args.command == "add_accounts": await pool.load_from_file(args.file_path, args.line_format) + print("\nNow run:\ntwscrape login_accounts") return if args.command == "del_accounts": diff --git a/twscrape/imap.py b/twscrape/imap.py index 1ba0ab9..b29cc60 100644 --- a/twscrape/imap.py +++ b/twscrape/imap.py @@ -10,7 +10,7 @@ _env = dict(os.environ) -LOGIN_CODE_TIMEOUT = int_or(_env, "LOGIN_CODE_TIMEOUT") or 40 +LOGIN_CODE_TIMEOUT = int_or(_env, "LOGIN_CODE_TIMEOUT") or 30 class EmailLoginError(Exception): diff --git a/twscrape/logger.py b/twscrape/logger.py index 774f392..209a426 100644 --- a/twscrape/logger.py +++ b/twscrape/logger.py @@ -3,7 +3,7 @@ from loguru import logger -_LEVELS = Literal["DEBUG", "INFO", "WARNING", "ERROR", "CRITICAL"] +_LEVELS = Literal["TRACE", "DEBUG", "INFO", "WARNING", "ERROR", "CRITICAL"] _LOG_LEVEL: _LEVELS = "INFO" diff --git a/twscrape/queue_client.py b/twscrape/queue_client.py index 37edb9f..ef84544 100644 --- a/twscrape/queue_client.py +++ b/twscrape/queue_client.py @@ -19,11 +19,7 @@ def __init__(self, acc: Account, clt: httpx.AsyncClient): self.req_count = 0 -class RateLimitError(Exception): - pass - - -class BannedError(Exception): +class HandledError(Exception): pass @@ -82,7 +78,7 @@ async def __aenter__(self): async def __aexit__(self, exc_type, exc_val, exc_tb): await self._close_ctx() - async def _close_ctx(self, reset_at=-1, banned=False, msg=""): + async def _close_ctx(self, reset_at=-1, inactive=False, msg: str | None = None): if self.ctx is None: return @@ -90,8 +86,8 @@ async def _close_ctx(self, reset_at=-1, banned=False, msg=""): username = ctx.acc.username await ctx.clt.aclose() - if banned: - await self.pool.mark_banned(username, msg) + if inactive: + await self.pool.mark_inactive(username, msg) return if reset_at > 0: @@ -123,60 +119,75 @@ async def _check_rep(self, rep: httpx.Response) -> None: except json.JSONDecodeError: res: Any = {"_raw": rep.text} + limit_remaining = int(rep.headers.get("x-rate-limit-remaining", -1)) + limit_reset = int(rep.headers.get("x-rate-limit-reset", -1)) + # limit_max = int(rep.headers.get("x-rate-limit-limit", -1)) + err_msg = "OK" if "errors" in res: err_msg = set([f'({x.get("code", -1)}) {x["message"]}' for x in res["errors"]]) err_msg = "; ".join(list(err_msg)) - if self.debug: - fn = logger.debug if rep.status_code == 200 else logger.warning - fn(f"{rep.status_code:3d} - {req_id(rep)} - {err_msg}") + log_msg = f"{rep.status_code:3d} - {req_id(rep)} - {err_msg}" + print(log_msg) + logger.trace(log_msg) - # need to add some features in api.py + # for dev: need to add some features in api.py if err_msg.startswith("(336) The following features cannot be null"): - logger.error(f"Update required: {err_msg}") + logger.error(f"[DEV] Update required: {err_msg}") exit(1) # general api rate limit - if int(rep.headers.get("x-rate-limit-remaining", -1)) == 0: - await self._close_ctx(int(rep.headers.get("x-rate-limit-reset", -1))) - raise RateLimitError(err_msg) + if limit_remaining == 0 and limit_reset > 0: + logger.debug(f"Rate limited: {log_msg}") + await self._close_ctx(limit_reset) + raise HandledError() - # possible new limits for tweets view per account - if err_msg.startswith("(88) Rate limit exceeded") or rep.status_code == 429: - await self._close_ctx(utc.ts() + 60 * 60 * 4) # lock for 4 hours - raise RateLimitError(err_msg) + # no way to check is account banned in direct way, but this check should work + if err_msg.startswith("(88) Rate limit exceeded") and limit_remaining > 0: + logger.warning(f"Ban detected: {log_msg}") + await self._close_ctx(-1, inactive=True, msg=err_msg) + raise HandledError() if err_msg.startswith("(326) Authorization: Denied by access control"): - await self._close_ctx(-1, banned=True, msg=err_msg) - raise BannedError(err_msg) - - # Something from twitter side, abort request so it doesn't hang - # https://github.com/vladkens/twscrape/pull/80 - if err_msg.startswith("(131) Dependency: Internal error."): + logger.warning(f"Ban detected: {log_msg}") + await self._close_ctx(-1, inactive=True, msg=err_msg) + raise HandledError() + + if err_msg.startswith("(32) Could not authenticate you"): + logger.warning(f"Session expired or banned: {log_msg}") + await self._close_ctx(-1, inactive=True, msg=err_msg) + raise HandledError() + + if err_msg == "OK" and rep.status_code == 403: + logger.warning(f"Session expired or banned: {log_msg}") + await self._close_ctx(-1, inactive=True, msg=None) + raise HandledError() + + # something from twitter side - abort all queries, see: https://github.com/vladkens/twscrape/pull/80 + if err_msg.startswith("(131) Dependency: Internal error"): logger.warning(f"Dependency error (request skipped): {err_msg}") raise AbortReqError() - # possible banned by old api flow - if rep.status_code in (401, 403): - await self._close_ctx(utc.ts() + 60 * 60 * 12) # lock for 12 hours - raise RateLimitError(err_msg) - # content not found - if rep.status_code == 200 and "_Missing: No status found with that ID." in err_msg: + if rep.status_code == 200 and "_Missing: No status found with that ID" in err_msg: return # ignore this error - # Something from twitter side, just ignore it - # https://github.com/vladkens/twscrape/pull/95 + # something from twitter side - just ignore it, see: https://github.com/vladkens/twscrape/pull/95 if rep.status_code == 200 and "Authorization" in err_msg: - logger.warning(f"Authorization unknown error: {err_msg}") + logger.warning(f"Authorization unknown error: {log_msg}") return if err_msg != "OK": - logger.warning(f"API unknown error: {err_msg}") + logger.warning(f"API unknown error: {log_msg}") return # ignore any other unknown errors - rep.raise_for_status() + try: + rep.raise_for_status() + except httpx.HTTPStatusError: + logger.error(f"Unhandled API response code: {log_msg}") + await self._close_ctx(utc.ts() + 60 * 15) # 15 minutes + raise HandledError() async def get(self, url: str, params: ReqParams = None): return await self.req("GET", url, params=params) @@ -185,6 +196,8 @@ async def req(self, method: str, url: str, params: ReqParams = None) -> httpx.Re retry_count = 0 while True: ctx = await self._get_ctx() + if ctx is None: + return None try: rep = await ctx.clt.request(method, url, params=params) @@ -194,16 +207,17 @@ async def req(self, method: str, url: str, params: ReqParams = None) -> httpx.Re ctx.req_count += 1 # count only successful retry_count = 0 return rep - except (RateLimitError, BannedError): - # already handled - continue except AbortReqError: + # abort all queries return + except HandledError: + # retry with new account + continue except (httpx.ReadTimeout, httpx.ProxyError): - # http transport failed, just retry + # http transport failed, just retry with same account continue except Exception as e: retry_count += 1 if retry_count >= 3: - logger.warning(f"Unknown error {type(e)}: {e}") + logger.warning(f"Unhandled error {type(e)}: {e}") await self._close_ctx(utc.ts() + 60 * 15) # 15 minutes