diff --git a/cyberdrop_dl/scraper/crawlers/xenforo_crawler.py b/cyberdrop_dl/scraper/crawlers/xenforo_crawler.py index c10426299..64f0bf624 100644 --- a/cyberdrop_dl/scraper/crawlers/xenforo_crawler.py +++ b/cyberdrop_dl/scraper/crawlers/xenforo_crawler.py @@ -129,34 +129,17 @@ async def fetch(self, scrape_item: ScrapeItem) -> None: elif self.thread_url_part in scrape_item.url.parts: await self.thread(scrape_item) elif any(p in scrape_item.url.parts for p in ("goto", "posts")): - await self.handle_redirect(scrape_item) + await self.redirect(scrape_item) else: raise ValueError @error_handling_wrapper - async def handle_redirect(self, scrape_item: ScrapeItem) -> None: + async def redirect(self, scrape_item: ScrapeItem) -> None: async with self.request_limiter: _, url = await self.client.get_soup_and_return_url(self.domain, scrape_item.url, origin=scrape_item) # type: ignore scrape_item.url = url self.manager.task_group.create_task(self.run(scrape_item)) - @error_handling_wrapper - async def login_setup(self) -> None: - login_url = self.primary_base_domain / "login" - host_cookies: dict = self.client.client_manager.cookies.filter_cookies(self.primary_base_domain) - session_cookie = host_cookies.get("xf_user") - session_cookie = session_cookie.value if session_cookie else None - if session_cookie: - self.logged_in = True - return - - forums_auth_data = self.manager.config_manager.authentication_data.forums - session_cookie = getattr(forums_auth_data, f"{self.domain}_xf_user_cookie") - username = getattr(forums_auth_data, f"{self.domain}_username") - password = getattr(forums_auth_data, f"{self.domain}_password") - - await self.forum_login(login_url, session_cookie, username, password) - @error_handling_wrapper async def thread(self, scrape_item: ScrapeItem) -> None: """Scrapes a forum thread.""" @@ -238,16 +221,23 @@ async def attachments(self, scrape_item: ScrapeItem, post: ForumPost) -> None: """~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~""" - def process_embed(self, data: str) -> str | None: - if not data: - return - data = data.replace(r"\/\/", "https://www.").replace("\\", "") - embed = re.search(HTTP_URL_PATTERNS[0], data) or re.search(HTTP_URL_PATTERNS[1], data) - return embed.group(0).replace("www.", "") if embed else data - async def filter_link(self, link: URL | None) -> URL | None: return link + async def thread_pager(self, scrape_item: ScrapeItem) -> AsyncGenerator[BeautifulSoup]: + """Generator of forum thread pages.""" + page_url = scrape_item.url + while True: + async with self.request_limiter: + soup: BeautifulSoup = await self.client.get_soup(self.domain, page_url, origin=scrape_item) + next_page = soup.select_one(self.selectors.next_page.element) + yield soup + if not next_page: + break + page_url_str: str = next_page.get(self.selectors.next_page.attribute) + page_url = self.parse_url(page_url_str) + page_url = await self.pre_filter_link(page_url) + async def process_children(self, scrape_item: ScrapeItem, links: list[Tag], selector: str) -> None: for link_obj in links: link_tag: Tag | str = link_obj.get(selector) @@ -285,6 +275,20 @@ def _(self, link_str: str) -> bool: link = self.parse_url(link_str) return self.is_attachment(link) + @singledispatchmethod + async def get_absolute_link(self, link: URL) -> URL | None: + absolute_link = link + if self.is_confirmation_link(link): + absolute_link = await self.handle_confirmation_link(link) + return absolute_link + + @get_absolute_link.register + async def _(self, link: str) -> URL | None: + parsed_link = None + link_str: str = link.replace(".th.", ".").replace(".md.", ".").replace("ifr", "watch") + parsed_link = self.parse_url(link_str) + return await self.get_absolute_link(parsed_link) + async def handle_link(self, scrape_item: ScrapeItem, link: URL) -> None: if not link: return @@ -306,9 +310,6 @@ async def handle_internal_link(self, link: URL, scrape_item: ScrapeItem) -> None new_scrape_item = self.create_scrape_item(scrape_item, link, "Attachments", part_of_album=True) await self.handle_file(link, new_scrape_item, filename, ext) - def is_confirmation_link(self, link: URL) -> bool: - return any(keyword in link.path for keyword in ("link-confirmation",)) - @error_handling_wrapper async def handle_confirmation_link(self, link: URL, *, origin: ScrapeItem | None = None) -> URL | None: """Handles link confirmation.""" @@ -327,26 +328,45 @@ async def write_last_forum_post(self, thread_url: URL, post_number: int | None) last_post_url = thread_url / post_string await self.manager.log_manager.write_last_post_log(last_post_url) - async def thread_pager(self, scrape_item: ScrapeItem) -> AsyncGenerator[BeautifulSoup]: - """Generator of forum thread pages.""" - page_url = scrape_item.url - while True: - async with self.request_limiter: - soup: BeautifulSoup = await self.client.get_soup(self.domain, page_url, origin=scrape_item) - next_page = soup.select_one(self.selectors.next_page.element) - yield soup - if not next_page: - break - page_url_str: str = next_page.get(self.selectors.next_page.attribute) - page_url = self.parse_url(page_url_str) - page_url = await self.pre_filter_link(page_url) + def is_valid_post_link(self, link_obj: Tag) -> bool: + is_image = link_obj.select_one("img") + link_str: str = link_obj.get(self.selectors.posts.links.element) + return not (is_image or self.is_attachment(link_str)) + + def is_confirmation_link(self, link: URL) -> bool: + return any(keyword in link.path for keyword in ("link-confirmation",)) + + def process_embed(self, data: str) -> str | None: + if not data: + return + data = data.replace(r"\/\/", "https://www.").replace("\\", "") + embed = re.search(HTTP_URL_PATTERNS[0], data) or re.search(HTTP_URL_PATTERNS[1], data) + return embed.group(0).replace("www.", "") if embed else data + + """ ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~""" + + @error_handling_wrapper + async def login_setup(self) -> None: + login_url = self.primary_base_domain / "login" + host_cookies: dict = self.client.client_manager.cookies.filter_cookies(self.primary_base_domain) + session_cookie = host_cookies.get("xf_user") + session_cookie = session_cookie.value if session_cookie else None + if session_cookie: + self.logged_in = True + return + + forums_auth_data = self.manager.config_manager.authentication_data.forums + session_cookie = getattr(forums_auth_data, f"{self.domain}_xf_user_cookie") + username = getattr(forums_auth_data, f"{self.domain}_username") + password = getattr(forums_auth_data, f"{self.domain}_password") + + await self.forum_login(login_url, session_cookie, username, password) @error_handling_wrapper async def forum_login(self, login_url: URL, session_cookie: str, username: str, password: str) -> None: """Logs into a forum.""" attempt = 0 - text, logged_in = await self.check_login_with_request(login_url) if logged_in: self.logged_in = True @@ -399,24 +419,7 @@ async def check_login_with_request(self, login_url: URL) -> tuple[str, bool]: text = await self.client.get_text(self.domain, login_url) return text, any(p in text for p in ('', "You are already logged in.")) - def is_valid_post_link(self, link_obj: Tag) -> bool: - is_image = link_obj.select_one("img") - link_str: str = link_obj.get(self.selectors.posts.links.element) - return not (is_image or self.is_attachment(link_str)) - - @singledispatchmethod - async def get_absolute_link(self, link: URL) -> URL | None: - absolute_link = link - if self.is_confirmation_link(link): - absolute_link = await self.handle_confirmation_link(link) - return absolute_link - - @get_absolute_link.register - async def _(self, link: str) -> URL | None: - parsed_link = None - link_str: str = link.replace(".th.", ".").replace(".md.", ".").replace("ifr", "watch") - parsed_link = self.parse_url(link_str) - return await self.get_absolute_link(parsed_link) + """~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~""" def get_thread_info(self, url: URL) -> ThreadInfo: name_index = url.parts.index(self.thread_url_part) + 1