From dd17ec076de354c8b00846ae4329a55e8c11c710 Mon Sep 17 00:00:00 2001 From: "tan.monivisal" Date: Tue, 18 Jul 2023 10:21:20 +0700 Subject: [PATCH] change --- facebook_scraper/__init__.py | 40 +-------- facebook_scraper/constants.py | 2 - facebook_scraper/extractors.py | 130 ++++++++++----------------- facebook_scraper/facebook_scraper.py | 15 ++-- facebook_scraper/page_iterators.py | 7 +- 5 files changed, 60 insertions(+), 134 deletions(-) diff --git a/facebook_scraper/__init__.py b/facebook_scraper/__init__.py index c9eecdff..16008ddd 100644 --- a/facebook_scraper/__init__.py +++ b/facebook_scraper/__init__.py @@ -5,12 +5,11 @@ import pathlib import sys import warnings -import pickle from typing import Any, Dict, Iterator, Optional, Set, Union from requests.cookies import cookiejar_from_dict -from .constants import DEFAULT_REQUESTS_TIMEOUT, DEFAULT_COOKIES_FILE_PATH +from .constants import DEFAULT_REQUESTS_TIMEOUT from .facebook_scraper import FacebookScraper from .fb_types import Credentials, Post, RawPost, Profile from .utils import html_element_to_string, parse_cookie_file @@ -300,6 +299,7 @@ def get_posts_by_search( credentials: Optional[Credentials] = None, **kwargs, ) -> Iterator[Post]: + """Get posts by searching all of Facebook Args: word (str): The word for searching posts. @@ -517,42 +517,6 @@ def enable_logging(level=logging.DEBUG): logger.setLevel(level) -def use_persistent_session(email: str, password: str, cookies_file_path=DEFAULT_COOKIES_FILE_PATH): - """Login persistently to Facebook and save cookies to a file (default: ".fb-cookies.pckl"). This is highly recommended if you want to scrape several times a day because it will keep your session alive instead of logging in every time (which can be flagged as suspicious by Facebook). - - Args: - email (str): email address to login. - password (str): password to login. - cookies_file_path (str, optional): path to the file in which to save cookies. Defaults to ".fb-cookies.pckl". - - Raises: - exceptions.InvalidCredentials: if the credentials are invalid. - - Returns: - Boolean: True if the login was successful, False otherwise. - """ - try: - with open(cookies_file_path, "rb") as f: - cookies = pickle.load(f) - logger.debug("Loaded cookies from %s", cookies_file_path) - except FileNotFoundError: - logger.error("No cookies file found at %s", cookies_file_path) - cookies = None - try: - if not cookies: - raise exceptions.InvalidCookies() - set_cookies(cookies) - logger.debug("Successfully logged in with cookies") - except exceptions.InvalidCookies: - logger.exception("Invalid cookies, trying to login with credentials") - _scraper.login(email, password) - cookies = _scraper.session.cookies - with open(cookies_file_path, "wb") as f: - pickle.dump(cookies, f) - set_cookies(cookies) - logger.debug("Successfully logged in with credentials") - - # Disable logging by default logger = logging.getLogger(__name__) logger.addHandler(logging.NullHandler()) diff --git a/facebook_scraper/constants.py b/facebook_scraper/constants.py index e5e8bd3e..15f81f05 100644 --- a/facebook_scraper/constants.py +++ b/facebook_scraper/constants.py @@ -5,5 +5,3 @@ DEFAULT_REQUESTS_TIMEOUT = 30 DEFAULT_PAGE_LIMIT = 10 - -DEFAULT_COOKIES_FILE_PATH = '.fb-cookies.pckl' diff --git a/facebook_scraper/extractors.py b/facebook_scraper/extractors.py index 5922663e..715dcaf8 100644 --- a/facebook_scraper/extractors.py +++ b/facebook_scraper/extractors.py @@ -8,7 +8,6 @@ from typing import Any, Dict, Optional from urllib.parse import parse_qs, urlparse from tqdm.auto import tqdm -from collections import defaultdict from . import utils, exceptions from .constants import FB_BASE_URL, FB_MOBILE_BASE_URL, FB_W3_BASE_URL @@ -87,7 +86,6 @@ class PostExtractor: bad_json_key_regex = re.compile(r'(?P[{,])(?P\w+):') more_url_regex = re.compile(r'(?<=…\s)Rate Translation') post_story_regex = re.compile(r'href="(\/story[^"]+)" aria') def __init__(self, element, options, request_fn, full_post_html=None): @@ -140,7 +138,6 @@ def make_new_post(self) -> Post: 'shared_time': None, 'shared_user_id': None, 'shared_username': None, - 'shared_user_url': None, 'shared_post_url': None, 'available': None, 'comments_full': None, @@ -273,84 +270,59 @@ def extract_text(self) -> PartialPost: element = self.element - story_containers = element.find(".story_body_container") - has_more = self.more_url_regex.search(element.html) if has_more and self.full_post_html: element = self.full_post_html.find('.story_body_container', first=True) - if not element and self.full_post_html.find("div.msg", first=True): - text = self.full_post_html.find("div.msg", first=True).text - return {"text": text, "post_text": text} - - - texts = defaultdict(str) - - for container_index, container in enumerate(story_containers): - - has_translation = self.has_translation_regex.search(container.html) - if has_translation: - original = container.find('div[style="display:none"]', first=True) - translation = utils.make_html_element( - html=container.html.replace(original.html, "") - ) - content_versions = [("hidden_original", original), ("translation", translation)] - else: - content_versions = [("original", container)] + + nodes = element.find('p, header, span[role=presentation]') + if nodes and len(nodes) > 1: + post_text = [] + shared_text = [] + ended = False + index_non_header = next( + (i for i, node in enumerate(nodes) if node.tag != 'header'), 1 + ) + for node in nodes[index_non_header:]: + if node.tag == 'header': + ended = True + + # Remove '... More' + # This button is meant to display the hidden text that is already loaded + # Not to be confused with the 'More' that opens the article in a new page + if node.tag == 'p': + node = utils.make_html_element( + html=node.html.replace('>… <', '><', 1).replace('>More<', '', 1) + ) + + if not ended: + post_text.append(node.text) + else: + shared_text.append(node.text) # Separation between paragraphs paragraph_separator = '\n\n' - for version, content in content_versions: - post_text = [] - shared_text = [] - nodes = content.find('p, header, span[role=presentation]') - - if version == "hidden_original": - if container_index == 0: - post_text.append(content.text) - else: - shared_text.append(content.text) - - elif nodes: - ended = False - index_non_header = next( - (i for i, node in enumerate(nodes) if node.tag != 'header'), 1 + text = paragraph_separator.join(itertools.chain(post_text, shared_text)) + post_text = paragraph_separator.join(post_text) + shared_text = paragraph_separator.join(shared_text) + + original_text = None + hidden_div = element.find('div[style="display:none"]', first=True) + if hidden_div: + original_text = [] + for node in hidden_div.find("p,span[role=presentation]"): + node = utils.make_html_element( + html=node.html.replace('>… <', '><', 1).replace('>More<', '', 1) ) - for node in nodes[index_non_header:]: - if node.tag == 'header' or container_index > 0: - ended = True - - # Remove '... More' - # This button is meant to display the hidden text that is already loaded - # Not to be confused with the 'More' that opens the article in a new page - if node.tag == 'p': - node = utils.make_html_element( - html=node.html.replace('>… <', '><', 1).replace('>More<', '', 1) - ) - - if not ended: - post_text.append(node.text) - else: - shared_text.append(node.text) - - text = paragraph_separator.join(itertools.chain(post_text, shared_text)) - post_text = paragraph_separator.join(post_text) - shared_text = paragraph_separator.join(shared_text) - - if version in ["original", "hidden_original"]: - texts["text"] += text - texts["post_text"] += post_text - texts["shared_text"] += shared_text - if version == "translation": - texts["translated_text"] += text - texts["translated_post_text"] += post_text - texts["translated_shared_text"] += shared_text - - if texts: - if texts["translated_text"]: - texts["original_text"] = texts["text"] - return dict(texts) + original_text.append(node.text) + original_text = paragraph_separator.join(original_text) + return { + 'text': text, + 'post_text': post_text, + 'shared_text': shared_text, + 'original_text': original_text, + } elif element.find(".story_body_container>div", first=True): text = element.find(".story_body_container>div", first=True).text return {'text': text, 'post_text': text} @@ -358,9 +330,6 @@ def extract_text(self) -> PartialPost: text = nodes[0].text return {'text': text, 'post_text': text} - - - return None # TODO: Add the correct timezone @@ -1031,13 +1000,11 @@ def extract_share_information(self): ) # We can re-use the existing parsers, as a one level deep recursion shared_post = PostExtractor(raw_post, self.options, self.request) - shared_user_info = shared_post.extract_username() return { 'shared_post_id': self.data_ft["original_content_id"], 'shared_time': shared_post.extract_time().get("time"), 'shared_user_id': self.data_ft["original_content_owner_id"], - 'shared_username': shared_user_info.get("username"), - 'shared_user_url': shared_user_info.get("user_url"), + 'shared_username': shared_post.extract_username().get("username"), 'shared_post_url': shared_post.extract_post_url().get("post_url"), } @@ -1119,13 +1086,6 @@ def parse_comment(self, comment): reactions = self.extract_reactions(comment_id, force_parse_HTML=True) if comment_reactors_opt != "generator": reactions["reactors"] = utils.safe_consume(reactions.get("reactors", [])) - else: - reactions_count = comment.find('span._14va', first=True) - if reactions_count and len(reactions_count.text) > 0: - reactions_count = reactions_count.text - else: - reactions_count = None - reactions.update({"reaction_count": reactions_count}) return { "comment_id": comment_id, diff --git a/facebook_scraper/facebook_scraper.py b/facebook_scraper/facebook_scraper.py index d5dcc65e..85bb7dfa 100755 --- a/facebook_scraper/facebook_scraper.py +++ b/facebook_scraper/facebook_scraper.py @@ -52,7 +52,7 @@ class FacebookScraper: "Accept": "*/*", "Connection": "keep-alive", "Accept-Encoding": "gzip,deflate", - "User-Agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 13_1) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/16.1 Safari/605.1.15", + "User-Agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_12_6) AppleWebKit/603.3.8 (KHTML, like Gecko) Version/10.1.2 Safari/603.3.8", } have_checked_locale = False @@ -757,10 +757,10 @@ def get_group_info(self, group, **kwargs) -> Profile: except: result["about"] = None - try: - url = members.find("a", first=True).attrs.get("href") - logger.debug(f"Requesting page from: {url}") + url = members.find("a", first=True).attrs.get("href") + logger.debug(f"Requesting page from: {url}") + try: resp = self.get(url).html url = resp.find("a[href*='listType=list_admin_moderator']", first=True) if kwargs.get("admins", True): @@ -959,10 +959,9 @@ def submit_form(self, response, extra_data={}): def login(self, email: str, password: str): response = self.get(self.base_url) - datr_cookie = re.search('(?<=_js_datr",")[^"]+', response.html.html) - if datr_cookie: - cookie_value = datr_cookie.group() - self.session.cookies.set('datr', cookie_value) + cookies_values = re.findall(r'js_datr","([^"]+)', response.html.html) + if len(cookies_values) == 1: + self.session.cookies.set("datr", cookies_values[0]) response = self.submit_form( response, {"email": email, "pass": password, "_fb_noscript": None} diff --git a/facebook_scraper/page_iterators.py b/facebook_scraper/page_iterators.py index cbe0b595..9925aa26 100644 --- a/facebook_scraper/page_iterators.py +++ b/facebook_scraper/page_iterators.py @@ -283,10 +283,15 @@ def get_next_page(self) -> Optional[URL]: if match: value = match.groups()[0] return value.encode('utf-8').decode('unicode_escape').replace('\\/', '/') + def get_page(self) -> Page: + try: + return super()._get_page('div._5rgr._5gh8._3-hy.async_like', 'article') + except: + return super()._get_page('article[data-ft*="top_level_post_id"]', 'article') class HashtagPageParser(PageParser): - cursor_regex = re.compile(r'(\/hashtag\/[a-z]+\/\?cursor=[^"]+).*$') + cursor_regex = re.compile(r'(\/hashtag\/[a-z]+\/\?locale=[a-z_A-Z]+&cursor=[^"]+).*$') def get_page(self) -> Page: return super()._get_page('article', 'article')