diff --git a/DrissionPage/_base/base.py b/DrissionPage/_base/base.py new file mode 100644 index 0000000..274b236 --- /dev/null +++ b/DrissionPage/_base/base.py @@ -0,0 +1,462 @@ +# -*- coding:utf-8 -*- +""" +@Author : g1879 +@Contact : g1879@qq.com +@Copyright: (c) 2024 by g1879, Inc. All Rights Reserved. +@License : BSD 3-Clause. +""" +from abc import abstractmethod +from pathlib import Path +from re import sub +from urllib.parse import quote + +from DownloadKit import DownloadKit + +from .._functions.settings import Settings +from .._functions.locator import get_loc +from .._functions.web import format_html +from .._elements.none_element import NoneElement +from ..errors import ElementNotFoundError + + +class BaseParser(object): + """所有页面、元素类的基类""" + + def __call__(self, locator): + return self.ele(locator) + + def ele(self, locator, index=1, timeout=None): + return self._ele(locator, timeout, index=index, method='ele()') + + def eles(self, locator, timeout=None): + return self._ele(locator, timeout, index=None) + + # ----------------以下属性或方法待后代实现---------------- + @property + def html(self): + return '' + + def s_ele(self, locator=None): + pass + + def s_eles(self, locator): + pass + + def _ele(self, locator, timeout=None, index=1, raise_err=None, method=None): + pass + + def _find_elements(self, locator, timeout=None, index=1, relative=False, raise_err=None): + pass + + +class BaseElement(BaseParser): + """各元素类的基类""" + + def __init__(self, owner=None): + self.owner = owner + self.page = owner._page if owner else None + self._type = 'BaseElement' + + # ----------------以下属性或方法由后代实现---------------- + @property + def tag(self): + return + + def parent(self, level_or_loc=1): + pass + + def next(self, index=1): + pass + + def nexts(self): + pass + + def _ele(self, locator, timeout=None, index=1, relative=False, raise_err=None, method=None): + """调用获取元素的方法 + :param locator: 定位符 + :param timeout: 超时时间(秒) + :param index: 获取第几个,从1开始,可传入负数获取倒数第几个 + :param relative: 是否相对定位 + :param raise_err: 找不到时是否抛出异常 + :param method: 调用的方法名 + :return: 元素对象或它们组成的列表 + """ + r = self._find_elements(locator, timeout=timeout, index=index, relative=relative, raise_err=raise_err) + if r or isinstance(r, list): + return r + if Settings.raise_when_ele_not_found or raise_err is True: + raise ElementNotFoundError(None, method, {'locator': locator, 'index': index}) + + r.method = method + r.args = {'locator': locator, 'index': index} + return r + + +class DrissionElement(BaseElement): + """ChromiumElement 和 SessionElement的基类,但不是ShadowRoot的基类""" + + @property + def link(self): + """返回href或src绝对url""" + return self.attr('href') or self.attr('src') + + @property + def css_path(self): + """返回css path路径""" + return self._get_ele_path('css') + + @property + def xpath(self): + """返回xpath路径""" + return self._get_ele_path('xpath') + + @property + def comments(self): + """返回元素注释文本组成的列表""" + return self.eles('xpath:.//comment()') + + def texts(self, text_node_only=False): + """返回元素内所有直接子节点的文本,包括元素和文本节点 + :param text_node_only: 是否只返回文本节点 + :return: 文本列表 + """ + if text_node_only: + texts = self.eles('xpath:/text()') + else: + texts = [x if isinstance(x, str) else x.text for x in self.eles('xpath:./text() | *')] + + return [format_html(x.strip(' ').rstrip('\n')) for x in texts if x and sub('[\r\n\t ]', '', x) != ''] + + def parent(self, level_or_loc=1, index=1): + """返回上面某一级父元素,可指定层数或用查询语法定位 + :param level_or_loc: 第几级父元素,1开始,或定位符 + :param index: 当level_or_loc传入定位符,使用此参数选择第几个结果,1开始 + :return: 上级元素对象 + """ + if isinstance(level_or_loc, int): + loc = f'xpath:./ancestor::*[{level_or_loc}]' + + elif isinstance(level_or_loc, (tuple, str)): + loc = get_loc(level_or_loc, True) + + if loc[0] == 'css selector': + raise ValueError('此css selector语法不受支持,请换成xpath。') + + loc = f'xpath:./ancestor::{loc[1].lstrip(". / ")}[{index}]' + + else: + raise TypeError('level_or_loc参数只能是tuple、int或str。') + + return self._ele(loc, timeout=0, relative=True, raise_err=False, method='parent()') + + def child(self, locator='', index=1, timeout=None, ele_only=True): + """返回直接子元素元素或节点组成的列表,可用查询语法筛选 + :param locator: 用于筛选的查询语法 + :param index: 第几个查询结果,1开始 + :param timeout: 查找节点的超时时间(秒) + :param ele_only: 是否只获取元素,为False时把文本、注释节点也纳入 + :return: 直接子元素或节点文本组成的列表 + """ + if isinstance(locator, int): + index = locator + locator = '' + if not locator: + loc = '*' if ele_only else 'node()' + else: + loc = get_loc(locator, True) # 把定位符转换为xpath + if loc[0] == 'css selector': + raise ValueError('此css selector语法不受支持,请换成xpath。') + loc = loc[1].lstrip('./') + + node = self._ele(f'xpath:./{loc}', timeout=timeout, index=index, relative=True, raise_err=False) + if node: + return node + + if Settings.raise_when_ele_not_found: + raise ElementNotFoundError(None, 'child()', {'locator': locator, 'index': index, + 'ele_only': ele_only}) + else: + return NoneElement(self.owner, 'child()', {'locator': locator, 'index': index, 'ele_only': ele_only}) + + def prev(self, locator='', index=1, timeout=None, ele_only=True): + """返回前面的一个兄弟元素,可用查询语法筛选,可指定返回筛选结果的第几个 + :param locator: 用于筛选的查询语法 + :param index: 前面第几个查询结果,1开始 + :param timeout: 查找节点的超时时间(秒) + :param ele_only: 是否只获取元素,为False时把文本、注释节点也纳入 + :return: 兄弟元素 + """ + return self._get_relative('prev()', 'preceding', True, locator, index, timeout, ele_only) + + def next(self, locator='', index=1, timeout=None, ele_only=True): + """返回后面的一个兄弟元素,可用查询语法筛选,可指定返回筛选结果的第几个 + :param locator: 用于筛选的查询语法 + :param index: 后面第几个查询结果,1开始 + :param timeout: 查找节点的超时时间(秒) + :param ele_only: 是否只获取元素,为False时把文本、注释节点也纳入 + :return: 兄弟元素 + """ + return self._get_relative('next()', 'following', True, locator, index, timeout, ele_only) + + def before(self, locator='', index=1, timeout=None, ele_only=True): + """返回前面的一个兄弟元素,可用查询语法筛选,可指定返回筛选结果的第几个 + :param locator: 用于筛选的查询语法 + :param index: 前面第几个查询结果,1开始 + :param timeout: 查找节点的超时时间(秒) + :param ele_only: 是否只获取元素,为False时把文本、注释节点也纳入 + :return: 本元素前面的某个元素或节点 + """ + return self._get_relative('before()', 'preceding', False, locator, index, timeout, ele_only) + + def after(self, locator='', index=1, timeout=None, ele_only=True): + """返回后面的一个兄弟元素,可用查询语法筛选,可指定返回筛选结果的第几个 + :param locator: 用于筛选的查询语法 + :param index: 后面第几个查询结果,1开始 + :param timeout: 查找节点的超时时间(秒) + :param ele_only: 是否只获取元素,为False时把文本、注释节点也纳入 + :return: 本元素后面的某个元素或节点 + """ + return self._get_relative('after()', 'following', False, locator, index, timeout, ele_only) + + def children(self, locator='', timeout=None, ele_only=True): + """返回直接子元素元素或节点组成的列表,可用查询语法筛选 + :param locator: 用于筛选的查询语法 + :param timeout: 查找节点的超时时间(秒) + :param ele_only: 是否只获取元素,为False时把文本、注释节点也纳入 + :return: 直接子元素或节点文本组成的列表 + """ + if not locator: + loc = '*' if ele_only else 'node()' + else: + loc = get_loc(locator, True) # 把定位符转换为xpath + if loc[0] == 'css selector': + raise ValueError('此css selector语法不受支持,请换成xpath。') + loc = loc[1].lstrip('./') + + loc = f'xpath:./{loc}' + nodes = self._ele(loc, timeout=timeout, index=None, relative=True) + return [e for e in nodes if not (isinstance(e, str) and sub('[ \n\t\r]', '', e) == '')] + + def prevs(self, locator='', timeout=None, ele_only=True): + """返回前面全部兄弟元素或节点组成的列表,可用查询语法筛选 + :param locator: 用于筛选的查询语法 + :param timeout: 查找节点的超时时间(秒) + :param ele_only: 是否只获取元素,为False时把文本、注释节点也纳入 + :return: 兄弟元素或节点文本组成的列表 + """ + return self._get_relatives(locator=locator, direction='preceding', timeout=timeout, ele_only=ele_only) + + def nexts(self, locator='', timeout=None, ele_only=True): + """返回后面全部兄弟元素或节点组成的列表,可用查询语法筛选 + :param locator: 用于筛选的查询语法 + :param timeout: 查找节点的超时时间(秒) + :param ele_only: 是否只获取元素,为False时把文本、注释节点也纳入 + :return: 兄弟元素或节点文本组成的列表 + """ + return self._get_relatives(locator=locator, direction='following', timeout=timeout, ele_only=ele_only) + + def befores(self, locator='', timeout=None, ele_only=True): + """返回后面全部兄弟元素或节点组成的列表,可用查询语法筛选 + :param locator: 用于筛选的查询语法 + :param timeout: 查找节点的超时时间(秒) + :param ele_only: 是否只获取元素,为False时把文本、注释节点也纳入 + :return: 本元素前面的元素或节点组成的列表 + """ + return self._get_relatives(locator=locator, direction='preceding', + brother=False, timeout=timeout, ele_only=ele_only) + + def afters(self, locator='', timeout=None, ele_only=True): + """返回前面全部兄弟元素或节点组成的列表,可用查询语法筛选 + :param locator: 用于筛选的查询语法 + :param timeout: 查找节点的超时时间(秒) + :param ele_only: 是否只获取元素,为False时把文本、注释节点也纳入 + :return: 本元素后面的元素或节点组成的列表 + """ + return self._get_relatives(locator=locator, direction='following', + brother=False, timeout=timeout, ele_only=ele_only) + + def _get_relative(self, func, direction, brother, locator='', index=1, timeout=None, ele_only=True): + """获取一个亲戚元素或节点,可用查询语法筛选,可指定返回筛选结果的第几个 + :param func: 方法名称 + :param direction: 方向,'following' 或 'preceding' + :param locator: 用于筛选的查询语法 + :param index: 前面第几个查询结果,1开始 + :param timeout: 查找节点的超时时间(秒) + :param ele_only: 是否只获取元素,为False时把文本、注释节点也纳入 + :return: 本元素前面的某个元素或节点 + """ + if isinstance(locator, int): + index = locator + locator = '' + node = self._get_relatives(index, locator, direction, brother, timeout, ele_only) + if node: + return node + if Settings.raise_when_ele_not_found: + raise ElementNotFoundError(None, func, {'locator': locator, 'index': index, 'ele_only': ele_only}) + else: + return NoneElement(self.owner, func, {'locator': locator, 'index': index, 'ele_only': ele_only}) + + def _get_relatives(self, index=None, locator='', direction='following', brother=True, timeout=.5, ele_only=True): + """按要求返回兄弟元素或节点组成的列表 + :param index: 获取第几个,该参数不为None时只获取该编号的元素 + :param locator: 用于筛选的查询语法 + :param direction: 'following' 或 'preceding',查找的方向 + :param brother: 查找范围,在同级查找还是整个dom前后查找 + :param timeout: 查找等待时间(秒) + :return: 元素对象或字符串 + """ + brother = '-sibling' if brother else '' + + if not locator: + loc = '*' if ele_only else 'node()' + + else: + loc = get_loc(locator, True) # 把定位符转换为xpath + if loc[0] == 'css selector': + raise ValueError('此css selector语法不受支持,请换成xpath。') + loc = loc[1].lstrip('./') + + loc = f'xpath:./{direction}{brother}::{loc}' + + if index is not None: + index = index if direction == 'following' else -index + nodes = self._ele(loc, timeout=timeout, index=index, relative=True, raise_err=False) + if isinstance(nodes, list): + nodes = [e for e in nodes if not (isinstance(e, str) and sub('[ \n\t\r]', '', e) == '')] + return nodes + + # ----------------以下属性或方法由后代实现---------------- + @property + def attrs(self): + return + + @property + def text(self): + return + + @property + def raw_text(self): + return + + @abstractmethod + def attr(self, name: str): + return '' + + def _get_ele_path(self, mode): + return '' + + def _find_elements(self, locator, timeout=None, index=1, relative=False, raise_err=None): + pass + + +class BasePage(BaseParser): + """页面类的基类""" + + def __init__(self): + """初始化函数""" + self._url = None + self._timeout = 10 + self._url_available = None + self.retry_times = 3 + self.retry_interval = 2 + self._DownloadKit = None + self._download_path = None + self._none_ele_return_value = False + self._none_ele_value = None + self._type = 'BasePage' + + @property + def title(self): + """返回网页title""" + ele = self._ele('xpath://title', raise_err=False, method='title') + return ele.text if ele else None + + @property + def timeout(self): + """返回查找元素时等待的秒数""" + return self._timeout + + @timeout.setter + def timeout(self, second): + """设置查找元素时等待的秒数""" + self._timeout = second + + @property + def url_available(self): + """返回当前访问的url有效性""" + return self._url_available + + @property + def download_path(self): + """返回默认下载路径""" + return self._download_path + + @property + def download(self): + """返回下载器对象""" + if self._DownloadKit is None: + self._DownloadKit = DownloadKit(driver=self, goal_path=self.download_path) + return self._DownloadKit + + def _before_connect(self, url, retry, interval): + """连接前的准备 + :param url: 要访问的url + :param retry: 重试次数 + :param interval: 重试间隔 + :return: 重试次数、间隔、是否文件组成的tuple + """ + is_file = False + if isinstance(url, Path) or ('://' not in url and ':\\\\' not in url): + p = Path(url) + if p.exists(): + url = str(p.absolute()) + is_file = True + + self._url = url if is_file else quote(url, safe='-_.~!*\'"();:@&=+$,/\\?#[]%') + retry = retry if retry is not None else self.retry_times + interval = interval if interval is not None else self.retry_interval + return retry, interval, is_file + + # ----------------以下属性或方法由后代实现---------------- + @property + def url(self): + return + + @property + def json(self): + return + + @property + def user_agent(self): + return + + @abstractmethod + def cookies(self, as_dict=False, all_info=False): + return {} + + @abstractmethod + def get(self, url, show_errmsg=False, retry=None, interval=None): + pass + + def _ele(self, locator, timeout=None, index=1, raise_err=None, method=None): + """调用获取元素的方法 + :param locator: 定位符 + :param timeout: 超时时间(秒) + :param index: 获取第几个,从1开始,可传入负数获取倒数第几个 + :param raise_err: 找不到时是否抛出异常 + :param method: 调用的方法名 + :return: 元素对象或它们组成的列表 + """ + if not locator: + raise ElementNotFoundError(None, method, {'locator': locator}) + + r = self._find_elements(locator, timeout=timeout, index=index, raise_err=raise_err) + + if r or isinstance(r, list): + return r + if Settings.raise_when_ele_not_found or raise_err is True: + raise ElementNotFoundError(None, method, {'locator': locator, 'index': index}) + + r.method = method + r.args = {'locator': locator, 'index': index} + return r diff --git a/DrissionPage/_base/base.pyi b/DrissionPage/_base/base.pyi new file mode 100644 index 0000000..8b73101 --- /dev/null +++ b/DrissionPage/_base/base.pyi @@ -0,0 +1,250 @@ +# -*- coding:utf-8 -*- +""" +@Author : g1879 +@Contact : g1879@qq.com +@Copyright: (c) 2024 by g1879, Inc. All Rights Reserved. +@License : BSD 3-Clause. +""" +from abc import abstractmethod +from typing import Union, Tuple, List, Any, Optional + +from DownloadKit import DownloadKit + +from .._elements.none_element import NoneElement +from .._elements.session_element import SessionElement +from .._pages.chromium_page import ChromiumPage +from .._pages.session_page import SessionPage +from .._pages.web_page import WebPage + + +class BaseParser(object): + _type: str + + def __call__(self, locator: Union[Tuple[str, str], str], index: int = 1): ... + + def ele(self, + locator: Union[Tuple[str, str], str, BaseElement], + index: int = 1, + timeout: float = None): ... + + def eles(self, locator: Union[Tuple[str, str], str], timeout=None): ... + + # ----------------以下属性或方法待后代实现---------------- + @property + def html(self) -> str: ... + + def s_ele(self, + locator: Union[Tuple[str, str], str, BaseElement, None] = None, + index: int = 1) -> SessionElement: ... + + def s_eles(self, locator: Union[Tuple[str, str], str]) -> List[SessionElement]: ... + + def _ele(self, + locator: Union[Tuple[str, str], str], + timeout: float = None, + index: Optional[int] = 1, + raise_err: bool = None, + method: str = None): ... + + def _find_elements(self, + locator: Union[Tuple[str, str], str], + timeout: float = None, + index: Optional[int] = 1, + relative: bool = False, + raise_err: bool = None): ... + + +class BaseElement(BaseParser): + + def __init__(self, owner: BasePage = None): + self.owner: BasePage = ... + self.page: Union[ChromiumPage, SessionPage, WebPage] = ... + + # ----------------以下属性或方法由后代实现---------------- + @property + def tag(self) -> str: ... + + def _ele(self, + locator: Union[Tuple[str, str], str], + timeout: float = None, + index: Optional[int] = 1, + relative: bool = False, + raise_err: bool = None, + method: str = None): ... + + def parent(self, level_or_loc: Union[tuple, str, int] = 1): ... + + def prev(self, index: int = 1) -> None: ... + + def prevs(self) -> None: ... + + def next(self, index: int = 1): ... + + def nexts(self): ... + + +class DrissionElement(BaseElement): + + def __init__(self, owner: BasePage = None): ... + + @property + def link(self) -> str: ... + + @property + def css_path(self) -> str: ... + + @property + def xpath(self) -> str: ... + + @property + def comments(self) -> list: ... + + def texts(self, text_node_only: bool = False) -> list: ... + + def parent(self, + level_or_loc: Union[tuple, str, int] = 1, + index: int = 1) -> Union[DrissionElement, None]: ... + + def child(self, + locator: Union[Tuple[str, str], str, int] = '', + index: int = 1, + timeout: float = None, + ele_only: bool = True) -> Union[DrissionElement, str, NoneElement]: ... + + def prev(self, + locator: Union[Tuple[str, str], str, int] = '', + index: int = 1, + timeout: float = None, + ele_only: bool = True) -> Union[DrissionElement, str, NoneElement]: ... + + def next(self, + locator: Union[Tuple[str, str], str, int] = '', + index: int = 1, + timeout: float = None, + ele_only: bool = True) -> Union[DrissionElement, str, NoneElement]: ... + + def before(self, + locator: Union[Tuple[str, str], str, int] = '', + index: int = 1, + timeout: float = None, + ele_only: bool = True) -> Union[DrissionElement, str, NoneElement]: ... + + def after(self, + locator: Union[Tuple[str, str], str, int] = '', + index: int = 1, + timeout: float = None, + ele_only: bool = True) -> Union[DrissionElement, str, NoneElement]: ... + + def children(self, + locator: Union[Tuple[str, str], str] = '', + timeout: float = None, + ele_only: bool = True) -> List[Union[DrissionElement, str]]: ... + + def prevs(self, + locator: Union[Tuple[str, str], str] = '', + timeout: float = None, + ele_only: bool = True) -> List[Union[DrissionElement, str]]: ... + + def nexts(self, + locator: Union[Tuple[str, str], str] = '', + timeout: float = None, + ele_only: bool = True) -> List[Union[DrissionElement, str]]: ... + + def befores(self, + locator: Union[Tuple[str, str], str] = '', + timeout: float = None, + ele_only: bool = True) -> List[Union[DrissionElement, str]]: ... + + def afters(self, + locator: Union[Tuple[str, str], str] = '', + timeout: float = None, + ele_only: bool = True) -> List[Union[DrissionElement, str]]: ... + + def _get_relative(self, + func: str, + direction: str, + brother: bool, + locator: Union[Tuple[str, str], str] = '', + index: int = 1, + timeout: float = None, + ele_only: bool = True) -> DrissionElement: ... + + def _get_relatives(self, + index: int = None, + locator: Union[Tuple[str, str], str] = '', + direction: str = 'following', + brother: bool = True, + timeout: float = 0.5, + ele_only: bool = True) -> List[Union[DrissionElement, str]]: ... + + # ----------------以下属性或方法由后代实现---------------- + @property + def attrs(self) -> dict: ... + + @property + def text(self) -> str: ... + + @property + def raw_text(self) -> str: ... + + @abstractmethod + def attr(self, name: str) -> str: ... + + def _get_ele_path(self, mode) -> str: ... + + +class BasePage(BaseParser): + + def __init__(self): + self._url_available: bool = ... + self.retry_times: int = ... + self.retry_interval: float = ... + self._timeout: float = ... + self._download_path: str = ... + self._DownloadKit: DownloadKit = ... + self._none_ele_return_value: bool = ... + self._none_ele_value: Any = ... + self._page: Union[ChromiumPage, SessionPage, WebPage]=... + + @property + def title(self) -> Union[str, None]: ... + + @property + def timeout(self) -> float: ... + + @timeout.setter + def timeout(self, second: float) -> None: ... + + @property + def url_available(self) -> bool: ... + + @property + def download_path(self) -> str: ... + + @property + def download(self) -> DownloadKit: ... + + def _before_connect(self, url: str, retry: int, interval: float) -> tuple: ... + + # ----------------以下属性或方法由后代实现---------------- + @property + def url(self) -> str: ... + + @property + def json(self) -> dict: ... + + @property + def user_agent(self) -> str: ... + + @abstractmethod + def cookies(self, as_dict: bool = False, all_info: bool = False) -> Union[list, dict]: ... + + @abstractmethod + def get(self, url: str, show_errmsg: bool = False, retry: int = None, interval: float = None): ... + + def _ele(self, + locator, + timeout: float = None, + index: Optional[int] = 1, + raise_err: bool = None, + method: str = None): ... diff --git a/DrissionPage/_base/browser.py b/DrissionPage/_base/browser.py new file mode 100644 index 0000000..3109afc --- /dev/null +++ b/DrissionPage/_base/browser.py @@ -0,0 +1,292 @@ +# -*- coding:utf-8 -*- +""" +@Author : g1879 +@Contact : g1879@qq.com +@Copyright: (c) 2024 by g1879, Inc. All Rights Reserved. +@License : BSD 3-Clause. +""" +from pathlib import Path +from shutil import rmtree +from time import perf_counter, sleep + +from websocket import WebSocketBadStatusException + +from .driver import BrowserDriver, Driver +from .._functions.tools import raise_error +from .._units.downloader import DownloadManager +from ..errors import PageDisconnectedError + +__ERROR__ = 'error' + + +class Browser(object): + BROWSERS = {} + + def __new__(cls, address, browser_id, page): + """ + :param address: 浏览器地址 + :param browser_id: 浏览器id + :param page: ChromiumPage对象 + """ + if browser_id in cls.BROWSERS: + return cls.BROWSERS[browser_id] + return object.__new__(cls) + + def __init__(self, address, browser_id, page): + """ + :param address: 浏览器地址 + :param browser_id: 浏览器id + :param page: ChromiumPage对象 + """ + if hasattr(self, '_created'): + return + self._created = True + Browser.BROWSERS[browser_id] = self + + self.page = page + self.address = address + self._driver = BrowserDriver(browser_id, 'browser', address, self) + self.id = browser_id + self._frames = {} + self._drivers = {} + self._all_drivers = {} + self._connected = False + + self._process_id = None + try: + r = self.run_cdp('SystemInfo.getProcessInfo') + for i in r.get('processInfo', []): + if i['type'] == 'browser': + self._process_id = i['id'] + break + except: + pass + + self.run_cdp('Target.setDiscoverTargets', discover=True) + self._driver.set_callback('Target.targetDestroyed', self._onTargetDestroyed) + self._driver.set_callback('Target.targetCreated', self._onTargetCreated) + + def _get_driver(self, tab_id, owner=None): + """新建并返回指定tab id的Driver + :param tab_id: 标签页id + :param owner: 使用该驱动的对象 + :return: Driver对象 + """ + d = self._drivers.pop(tab_id, None) + if not d: + d = Driver(tab_id, 'page', self.address) + d.owner = owner + self._all_drivers.setdefault(tab_id, set()).add(d) + return d + + def _onTargetCreated(self, **kwargs): + """标签页创建时执行""" + if (kwargs['targetInfo']['type'] in ('page', 'webview') + and kwargs['targetInfo']['targetId'] not in self._all_drivers + and not kwargs['targetInfo']['url'].startswith('devtools://')): + try: + tab_id = kwargs['targetInfo']['targetId'] + d = Driver(tab_id, 'page', self.address) + self._drivers[tab_id] = d + self._all_drivers.setdefault(tab_id, set()).add(d) + except WebSocketBadStatusException: + pass + + def _onTargetDestroyed(self, **kwargs): + """标签页关闭时执行""" + tab_id = kwargs['targetId'] + if hasattr(self, '_dl_mgr'): + self._dl_mgr.clear_tab_info(tab_id) + for key in [k for k, i in self._frames.items() if i == tab_id]: + self._frames.pop(key, None) + for d in self._all_drivers.get(tab_id, tuple()): + d.stop() + self._drivers.pop(tab_id, None) + self._all_drivers.pop(tab_id, None) + + def connect_to_page(self): + """执行与page相关的逻辑""" + if not self._connected: + self._dl_mgr = DownloadManager(self) + self._connected = True + + def run_cdp(self, cmd, **cmd_args): + """执行Chrome DevTools Protocol语句 + :param cmd: 协议项目 + :param cmd_args: 参数 + :return: 执行的结果 + """ + ignore = cmd_args.pop('_ignore', None) + r = self._driver.run(cmd, **cmd_args) + return r if __ERROR__ not in r else raise_error(r, ignore) + + @property + def driver(self): + return self._driver + + @property + def tabs_count(self): + """返回标签页数量""" + j = self.run_cdp('Target.getTargets')['targetInfos'] # 不要改用get,避免卡死 + return len([i for i in j if i['type'] in ('page', 'webview') and not i['url'].startswith('devtools://')]) + + @property + def tab_ids(self): + """返回所有标签页id组成的列表""" + j = self._driver.get(f'http://{self.address}/json').json() # 不要改用cdp,因为顺序不对 + return [i['id'] for i in j if i['type'] in ('page', 'webview') + and not i['url'].startswith('devtools://')] + + @property + def process_id(self): + """返回浏览器进程id""" + return self._process_id + + def find_tabs(self, title=None, url=None, tab_type=None): + """查找符合条件的tab,返回它们组成的列表,title和url是与关系 + :param title: 要匹配title的文本 + :param url: 要匹配url的文本 + :param tab_type: tab类型,可用列表输入多个 + :return: dict格式的tab信息列表列表 + """ + tabs = self._driver.get(f'http://{self.address}/json').json() # 不要改用cdp + + if isinstance(tab_type, str): + tab_type = {tab_type} + elif isinstance(tab_type, (list, tuple, set)): + tab_type = set(tab_type) + elif tab_type is not None: + raise TypeError('tab_type只能是set、list、tuple、str、None。') + + return [i for i in tabs if ((title is None or title in i['title']) and (url is None or url in i['url']) + and (tab_type is None or i['type'] in tab_type))] + + def close_tab(self, tab_id): + """关闭标签页 + :param tab_id: 标签页id + :return: None + """ + self._onTargetDestroyed(targetId=tab_id) + self.driver.run('Target.closeTarget', targetId=tab_id) + + def stop_driver(self, driver): + """停止一个Driver + :param driver: Driver对象 + :return: None + """ + driver.stop() + self._all_drivers.get(driver.id, set()).discard(driver) + + def activate_tab(self, tab_id): + """使标签页变为活动状态 + :param tab_id: 标签页id + :return: None + """ + self.run_cdp('Target.activateTarget', targetId=tab_id) + + def get_window_bounds(self, tab_id=None): + """返回浏览器窗口位置和大小信息 + :param tab_id: 标签页id + :return: 窗口大小字典 + """ + return self.run_cdp('Browser.getWindowForTarget', targetId=tab_id or self.id)['bounds'] + + def new_tab(self, new_window=False, background=False, new_context=False): + """新建一个标签页 + :param new_window: 是否在新窗口打开标签页 + :param background: 是否不激活新标签页,如new_window为True则无效 + :param new_context: 是否创建新的上下文 + :return: 新标签页id + """ + bid = None + if new_context: + bid = self.run_cdp('Target.createBrowserContext')['browserContextId'] + + kwargs = {'url': ''} + if new_window: + kwargs['newWindow'] = True + if background: + kwargs['background'] = True + if bid: + kwargs['browserContextId'] = bid + + tid = self.run_cdp('Target.createTarget', **kwargs)['targetId'] + while tid not in self._drivers: + sleep(.1) + return tid + + def reconnect(self): + """断开重连""" + self._driver.stop() + BrowserDriver.BROWSERS.pop(self.id) + self._driver = BrowserDriver(self.id, 'browser', self.address, self) + self.run_cdp('Target.setDiscoverTargets', discover=True) + self._driver.set_callback('Target.targetDestroyed', self._onTargetDestroyed) + self._driver.set_callback('Target.targetCreated', self._onTargetCreated) + + def quit(self, timeout=5, force=False): + """关闭浏览器 + :param timeout: 等待浏览器关闭超时时间(秒) + :param force: 是否立刻强制终止进程 + :return: None + """ + try: + self.run_cdp('Browser.close') + except PageDisconnectedError: + pass + self.driver.stop() + + drivers = list(self._all_drivers.values()) + for tab in drivers: + for driver in tab: + driver.stop() + + if not force: + return + + try: + pids = [pid['id'] for pid in self.run_cdp('SystemInfo.getProcessInfo')['processInfo']] + except: + return + + from psutil import Process + for pid in pids: + try: + Process(pid).kill() + except: + pass + + from os import popen + from platform import system + end_time = perf_counter() + timeout + while perf_counter() < end_time: + ok = True + for pid in pids: + txt = f'tasklist | findstr {pid}' if system().lower() == 'windows' else f'ps -ef | grep {pid}' + p = popen(txt) + sleep(.05) + try: + if f' {pid} ' in p.read(): + ok = False + break + except TypeError: + pass + + if ok: + break + + def _on_disconnect(self): + self.page._on_disconnect() + Browser.BROWSERS.pop(self.id, None) + if self.page._chromium_options.is_auto_port and self.page._chromium_options.user_data_path: + path = Path(self.page._chromium_options.user_data_path) + end_time = perf_counter() + 7 + while perf_counter() < end_time: + if not path.exists(): + break + try: + rmtree(path) + break + except (PermissionError, FileNotFoundError, OSError): + pass + sleep(.03) diff --git a/DrissionPage/_base/browser.pyi b/DrissionPage/_base/browser.pyi new file mode 100644 index 0000000..c6c76a9 --- /dev/null +++ b/DrissionPage/_base/browser.pyi @@ -0,0 +1,71 @@ +# -*- coding:utf-8 -*- +""" +@Author : g1879 +@Contact : g1879@qq.com +@Copyright: (c) 2024 by g1879, Inc. All Rights Reserved. +@License : BSD 3-Clause. +""" +from typing import List, Optional, Union, Set, Dict + +from .driver import BrowserDriver, Driver +from .._pages.chromium_page import ChromiumPage +from .._units.downloader import DownloadManager + + +class Browser(object): + BROWSERS: dict = ... + page: ChromiumPage = ... + _driver: BrowserDriver = ... + id: str = ... + address: str = ... + _frames: dict = ... + _drivers: Dict[str, Driver] = ... + _all_drivers: Dict[str, Set[Driver]] = ... + _process_id: Optional[int] = ... + _dl_mgr: DownloadManager = ... + _connected: bool = ... + + def __new__(cls, address: str, browser_id: str, page: ChromiumPage): ... + + def __init__(self, address: str, browser_id: str, page: ChromiumPage): ... + + def _get_driver(self, tab_id: str, owner=None) -> Driver: ... + + def run_cdp(self, cmd, **cmd_args) -> dict: ... + + @property + def driver(self) -> BrowserDriver: ... + + @property + def tabs_count(self) -> int: ... + + @property + def tab_ids(self) -> List[str]: ... + + @property + def process_id(self) -> Optional[int]: ... + + def find_tabs(self, title: str = None, url: str = None, + tab_type: Union[str, list, tuple] = None) -> List[dict]: ... + + def close_tab(self, tab_id: str) -> None: ... + + def stop_driver(self, driver: Driver) -> None: ... + + def activate_tab(self, tab_id: str) -> None: ... + + def get_window_bounds(self, tab_id: str = None) -> dict: ... + + def new_tab(self, new_window: bool = False, background: bool = False, new_context: bool = False) -> str: ... + + def reconnect(self) -> None: ... + + def connect_to_page(self) -> None: ... + + def _onTargetCreated(self, **kwargs) -> None: ... + + def _onTargetDestroyed(self, **kwargs) -> None: ... + + def quit(self, timeout: float = 5, force: bool = False) -> None: ... + + def _on_disconnect(self) -> None: ... diff --git a/DrissionPage/_base/driver.py b/DrissionPage/_base/driver.py new file mode 100644 index 0000000..e6b93d4 --- /dev/null +++ b/DrissionPage/_base/driver.py @@ -0,0 +1,283 @@ +# -*- coding: utf-8 -*- +""" +@Author : g1879 +@Contact : g1879@qq.com +@Copyright: (c) 2024 by g1879, Inc. All Rights Reserved. +@License : BSD 3-Clause. +""" +from json import dumps, loads, JSONDecodeError +from queue import Queue, Empty +from threading import Thread, Event +from time import perf_counter, sleep + +from requests import Session +from websocket import (WebSocketTimeoutException, WebSocketConnectionClosedException, create_connection, + WebSocketException, WebSocketBadStatusException) + +from .._functions.settings import Settings +from ..errors import PageDisconnectedError + + +class Driver(object): + def __init__(self, tab_id, tab_type, address, owner=None): + """ + :param tab_id: 标签页id + :param tab_type: 标签页类型 + :param address: 浏览器连接地址 + :param owner: 创建这个驱动的对象 + """ + self.id = tab_id + self.address = address + self.type = tab_type + self.owner = owner + # self._debug = False + self.alert_flag = False # 标记alert出现,跳过一条请求后复原 + + self._websocket_url = f'ws://{address}/devtools/{tab_type}/{tab_id}' + self._cur_id = 0 + self._ws = None + + self._recv_th = Thread(target=self._recv_loop) + self._handle_event_th = Thread(target=self._handle_event_loop) + self._recv_th.daemon = True + self._handle_event_th.daemon = True + self._handle_immediate_event_th = None + + self._stopped = Event() + + self.event_handlers = {} + self.immediate_event_handlers = {} + self.method_results = {} + self.event_queue = Queue() + self.immediate_event_queue = Queue() + + self.start() + + def _send(self, message, timeout=None): + """发送信息到浏览器,并返回浏览器返回的信息 + :param message: 发送给浏览器的数据 + :param timeout: 超时时间,为None表示无限 + :return: 浏览器返回的数据 + """ + self._cur_id += 1 + ws_id = self._cur_id + message['id'] = ws_id + message_json = dumps(message) + + # if self._debug: + # if self._debug is True or (isinstance(self._debug, str) and + # message.get('method', '').startswith(self._debug)): + # print(f'发> {message_json}') + # elif isinstance(self._debug, (list, tuple, set)): + # for m in self._debug: + # if message.get('method', '').startswith(m): + # print(f'发> {message_json}') + # break + + end_time = perf_counter() + timeout if timeout is not None else None + self.method_results[ws_id] = Queue() + try: + self._ws.send(message_json) + if timeout == 0: + self.method_results.pop(ws_id, None) + return {'id': ws_id, 'result': {}} + + except (OSError, WebSocketConnectionClosedException): + self.method_results.pop(ws_id, None) + return {'error': {'message': 'connection disconnected'}, 'type': 'connection_error'} + + while not self._stopped.is_set(): + try: + result = self.method_results[ws_id].get(timeout=.2) + self.method_results.pop(ws_id, None) + return result + + except Empty: + if self.alert_flag and message['method'].startswith(('Input.', 'Runtime.')): + return {'error': {'message': 'alert exists.'}, 'type': 'alert_exists'} + + if timeout is not None and perf_counter() > end_time: + self.method_results.pop(ws_id, None) + return {'error': {'message': 'alert exists.'}, 'type': 'alert_exists'} \ + if self.alert_flag else {'error': {'message': 'timeout'}, 'type': 'timeout'} + + continue + + return {'error': {'message': 'connection disconnected'}, 'type': 'connection_error'} + + def _recv_loop(self): + """接收浏览器信息的守护线程方法""" + while not self._stopped.is_set(): + try: + # self._ws.settimeout(1) + msg_json = self._ws.recv() + msg = loads(msg_json) + except WebSocketTimeoutException: + continue + except (WebSocketException, OSError, WebSocketConnectionClosedException, JSONDecodeError): + self._stop() + return + + # if self._debug: + # if self._debug is True or 'id' in msg or (isinstance(self._debug, str) + # and msg.get('method', '').startswith(self._debug)): + # print(f'<收 {msg_json}') + # elif isinstance(self._debug, (list, tuple, set)): + # for m in self._debug: + # if msg.get('method', '').startswith(m): + # print(f'<收 {msg_json}') + # break + + if 'method' in msg: + if msg['method'].startswith('Page.javascriptDialog'): + self.alert_flag = msg['method'].endswith('Opening') + function = self.immediate_event_handlers.get(msg['method']) + if function: + self._handle_immediate_event(function, msg['params']) + else: + self.event_queue.put(msg) + + elif msg.get('id') in self.method_results: + self.method_results[msg['id']].put(msg) + + # elif self._debug: + # print(f'未知信息:{msg}') + + def _handle_event_loop(self): + """当接收到浏览器信息,执行已绑定的方法""" + while not self._stopped.is_set(): + try: + event = self.event_queue.get(timeout=1) + except Empty: + continue + + function = self.event_handlers.get(event['method']) + if function: + function(**event['params']) + + self.event_queue.task_done() + + def _handle_immediate_event_loop(self): + while not self._stopped.is_set() and not self.immediate_event_queue.empty(): + function, kwargs = self.immediate_event_queue.get(timeout=1) + try: + function(**kwargs) + except PageDisconnectedError: + pass + + def _handle_immediate_event(self, function, kwargs): + """处理立即执行的动作 + :param function: 要运行下方法 + :param kwargs: 方法参数 + :return: None + """ + self.immediate_event_queue.put((function, kwargs)) + if self._handle_immediate_event_th is None or not self._handle_immediate_event_th.is_alive(): + self._handle_immediate_event_th = Thread(target=self._handle_immediate_event_loop) + self._handle_immediate_event_th.daemon = True + self._handle_immediate_event_th.start() + + def run(self, _method, **kwargs): + """执行cdp方法 + :param _method: cdp方法名 + :param kwargs: cdp参数 + :return: 执行结果 + """ + if self._stopped.is_set(): + return {'error': 'connection disconnected', 'type': 'connection_error'} + + timeout = kwargs.pop('_timeout', Settings.cdp_timeout) + result = self._send({'method': _method, 'params': kwargs}, timeout=timeout) + if 'result' not in result and 'error' in result: + kwargs['_timeout'] = timeout + return {'error': result['error']['message'], 'type': result.get('type', 'call_method_error'), + 'method': _method, 'args': kwargs} + else: + return result['result'] + + def start(self): + """启动连接""" + self._stopped.clear() + try: + self._ws = create_connection(self._websocket_url, enable_multithread=True, suppress_origin=True) + except WebSocketBadStatusException as e: + if 'Handshake status 403 Forbidden' in str(e): + raise RuntimeError('请升级websocket-client库。') + else: + return + self._recv_th.start() + self._handle_event_th.start() + return True + + def stop(self): + """中断连接""" + self._stop() + while self._handle_event_th.is_alive() or self._recv_th.is_alive(): + sleep(.1) + return True + + def _stop(self): + """中断连接""" + if self._stopped.is_set(): + return False + + self._stopped.set() + if self._ws: + self._ws.close() + self._ws = None + + # try: + # while not self.event_queue.empty(): + # event = self.event_queue.get_nowait() + # function = self.event_handlers.get(event['method']) + # if function: + # function(**event['params']) + # sleep(.1) + # except: + # pass + + self.event_handlers.clear() + self.method_results.clear() + self.event_queue.queue.clear() + + if hasattr(self.owner, '_on_disconnect'): + self.owner._on_disconnect() + + def set_callback(self, event, callback, immediate=False): + """绑定cdp event和回调方法 + :param event: cdp event + :param callback: 绑定到cdp event的回调方法 + :param immediate: 是否要立即处理的动作 + :return: None + """ + handler = self.immediate_event_handlers if immediate else self.event_handlers + if callback: + handler[event] = callback + else: + handler.pop(event, None) + + +class BrowserDriver(Driver): + BROWSERS = {} + + def __new__(cls, tab_id, tab_type, address, owner): + if tab_id in cls.BROWSERS: + return cls.BROWSERS[tab_id] + return object.__new__(cls) + + def __init__(self, tab_id, tab_type, address, owner): + if hasattr(self, '_created'): + return + self._created = True + BrowserDriver.BROWSERS[tab_id] = self + super().__init__(tab_id, tab_type, address, owner) + self._control_session = Session() + self._control_session.trust_env = False + + def __repr__(self): + return f'' + + def get(self, url): + r = self._control_session.get(url, headers={'Connection': 'close'}) + r.close() + return r diff --git a/DrissionPage/_base/driver.pyi b/DrissionPage/_base/driver.pyi new file mode 100644 index 0000000..7e8d481 --- /dev/null +++ b/DrissionPage/_base/driver.pyi @@ -0,0 +1,77 @@ +# -*- coding: utf-8 -*- +""" +@Author : g1879 +@Contact : g1879@qq.com +@Copyright: (c) 2024 by g1879, Inc. All Rights Reserved. +@License : BSD 3-Clause. +""" +from queue import Queue +from threading import Thread, Event +from typing import Union, Callable, Dict, Optional + +from requests import Response, Session +from websocket import WebSocket + +from .browser import Browser + + +class GenericAttr(object): + def __init__(self, name: str, tab: Driver): ... + + def __getattr__(self, item: str) -> Callable: ... + + def __setattr__(self, key: str, value: Callable) -> None: ... + + +class Driver(object): + id: str + address: str + type: str + owner = ... + alert_flag: bool + _websocket_url: str + _cur_id: int + _ws: Optional[WebSocket] + _recv_th: Thread + _handle_event_th: Thread + _handle_immediate_event_th: Optional[Thread] + _stopped: Event + event_handlers: dict + immediate_event_handlers: dict + method_results: dict + event_queue: Queue + immediate_event_queue: Queue + + def __init__(self, tab_id: str, tab_type: str, address: str, owner=None): ... + + def _send(self, message: dict, timeout: float = None) -> dict: ... + + def _recv_loop(self) -> None: ... + + def _handle_event_loop(self) -> None: ... + + def _handle_immediate_event_loop(self): ... + + def _handle_immediate_event(self, function: Callable, kwargs: dict): ... + + def run(self, _method: str, **kwargs) -> dict: ... + + def start(self) -> bool: ... + + def stop(self) -> bool: ... + + def _stop(self) -> None: ... + + def set_callback(self, event: str, callback: Union[Callable, None], immediate: bool = False) -> None: ... + + +class BrowserDriver(Driver): + BROWSERS: Dict[str, Driver] = ... + owner: Browser = ... + _control_session: Session = ... + + def __new__(cls, tab_id: str, tab_type: str, address: str, owner: Browser): ... + + def __init__(self, tab_id: str, tab_type: str, address: str, owner: Browser): ... + + def get(self, url) -> Response: ...