diff --git a/snscrape/modules/instagram.py b/snscrape/modules/instagram.py index 4973333..cc50e46 100644 --- a/snscrape/modules/instagram.py +++ b/snscrape/modules/instagram.py @@ -1,4 +1,14 @@ -__all__ = ['InstagramPost', 'User', 'InstagramUserScraper', 'InstagramHashtagScraper', 'InstagramLocationScraper'] +__all__ = [ + 'InstagramPost', + 'User', + 'Medium', + 'Photo', + 'VideoVariant', + 'Video', + 'InstagramUserScraper', + 'InstagramHashtagScraper', + 'InstagramLocationScraper' +] import dataclasses @@ -15,18 +25,49 @@ _logger = logging.getLogger(__name__) +class Medium: + pass + + +@dataclasses.dataclass +class Photo(Medium): + thumbnailUrl: str + fullUrl: str + altText: typing.Optional[str] = None + + +@dataclasses.dataclass +class VideoVariant: + url: str + width: int + height: int + contentType: typing.Optional[str] = None + + +@dataclasses.dataclass +class Video(Medium): + thumbnailUrl: str + variants: typing.List[VideoVariant] + duration: typing.Optional[float] = None + views: typing.Optional[int] = None + altText: typing.Optional[str] = None + + @dataclasses.dataclass class InstagramPost(snscrape.base.Item): url: str date: datetime.datetime content: typing.Optional[str] - thumbnailUrl: str - displayUrl: str username: typing.Optional[str] likes: int comments: int commentsDisabled: bool isVideo: bool + medium: typing.Union['Photo', 'Video'] + id: str + + thumbnailUrl = snscrape.base._DeprecatedProperty('thumbnailUrl', lambda self: self.medium.thumbnailUrl, 'medium.thumbnailUrl') + displayUrl = snscrape.base._DeprecatedProperty('displayUrl', lambda self: None if self.isVideo else self.medium.fullUrl, 'medium.fullUrl') def __str__(self): return self.url @@ -53,24 +94,32 @@ def __init__(self, **kwargs): super().__init__(**kwargs) self._headers = {'User-Agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_10_1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/39.0.2171.95 Safari/537.36'} self._initialPage = None + self._apiUrl = None def _response_to_items(self, response): - for node in response[self._responseContainer][self._edgeXToMedia]['edges']: + for node in response[self._edgeXToMedia]['edges']: code = node['node']['shortcode'] username = node['node']['owner']['username'] if 'username' in node['node']['owner'] else None url = f'https://www.instagram.com/p/{code}/' + + medium = Photo(node['node']['thumbnail_src'], node['node']['display_url']) + if node['node']['is_video']: + variants = [ + VideoVariant(url = node['node']['video_url'], width = node['node']['dimensions']['width'], height = node['node']['dimensions']['height']) + ] + medium = Video(thumbnailUrl = node['node']['thumbnail_src'], variants = variants, duration = int(node['node']['video_duration']) if 'video_duration' in node['node'] else None, views = node['node']['video_view_count']) yield InstagramPost( - url = url, - date = datetime.datetime.fromtimestamp(node['node']['taken_at_timestamp'], datetime.timezone.utc), - content = node['node']['edge_media_to_caption']['edges'][0]['node']['text'] if len(node['node']['edge_media_to_caption']['edges']) else None, - thumbnailUrl = node['node']['thumbnail_src'], - displayUrl = node['node']['display_url'], - username = username, - likes = node['node']['edge_media_preview_like']['count'], - comments = node['node']['edge_media_to_comment']['count'], - commentsDisabled = node['node']['comments_disabled'], - isVideo = node['node']['is_video'], - ) + url = url, + date = datetime.datetime.fromtimestamp(node['node']['taken_at_timestamp'], datetime.timezone.utc), + content = node['node']['edge_media_to_caption']['edges'][0]['node']['text'] if len(node['node']['edge_media_to_caption']['edges']) else None, + username = username, + likes = node['node']['edge_media_preview_like']['count'], + comments = node['node']['edge_media_to_comment']['count'], + commentsDisabled = node['node']['comments_disabled'], + isVideo = node['node']['is_video'], + medium = medium, + id = node['node']['id'], + ) def _initial_page(self): if self._initialPage is None: @@ -80,18 +129,29 @@ def _initial_page(self): raise snscrape.base.ScraperException(f'Got status code {r.status_code}') elif r.url.startswith('https://www.instagram.com/accounts/login/'): raise snscrape.base.ScraperException('Redirected to login page') + r = self._get( + self._apiUrl, + headers = self._headers, + responseOkCallback = self._check_json_callback + ) self._initialPage = r + return self._initialPage def _check_initial_page_callback(self, r): if r.status_code != 200: return True, None - jsonData = r.text.split('')[0] # May throw an IndexError if Instagram changes something again; we just let that bubble. - try: - obj = json.loads(jsonData) - except json.JSONDecodeError: - return False, 'invalid JSON' - r._snscrape_json_obj = obj + if (match := re.search( + r'\\"csrf_token\\":\\"([\da-zA-Z]+)\\",', + r.text)): + _logger.debug('Found csrf token in HTML') + self._headers['X-Csrftoken'] = match.group(1) + if (match := re.search( + r'"X-IG-App-ID":"(\d+)"', + r.text)): + _logger.debug('Found X-IG-App-ID token in HTML') + self._headers['X-IG-App-ID'] = match.group(1) + return True, None def _check_json_callback(self, r): @@ -112,25 +172,22 @@ def get_items(self): _logger.warning('Page does not exist') return response = r._snscrape_json_obj - rhxGis = response['rhx_gis'] if 'rhx_gis' in response else '' - if response['entry_data'][self._pageName][0]['graphql'][self._responseContainer][self._edgeXToMedia]['count'] == 0: + if response['data'][self._responseContainer][self._edgeXToMedia]['count'] == 0: _logger.info('Page has no posts') return - if not response['entry_data'][self._pageName][0]['graphql'][self._responseContainer][self._edgeXToMedia]['edges']: + if not response['data'][self._responseContainer][self._edgeXToMedia]['edges']: _logger.warning('Private account') return - pageID = response['entry_data'][self._pageName][0]['graphql'][self._responseContainer][self._pageIDKey] - yield from self._response_to_items(response['entry_data'][self._pageName][0]['graphql']) - if not response['entry_data'][self._pageName][0]['graphql'][self._responseContainer][self._edgeXToMedia]['page_info']['has_next_page']: + pageID = response['data'][self._responseContainer][self._pageIDKey] + yield from self._response_to_items(response['data'][self._responseContainer]) + if not response['data'][self._responseContainer][self._edgeXToMedia]['page_info']['has_next_page']: return - endCursor = response['entry_data'][self._pageName][0]['graphql'][self._responseContainer][self._edgeXToMedia]['page_info']['end_cursor'] + endCursor = response['data'][self._responseContainer][self._edgeXToMedia]['page_info']['end_cursor'] headers = self._headers.copy() while True: _logger.info(f'Retrieving endCursor = {endCursor!r}') variables = self._variablesFormat.format(**locals()) - headers['X-Requested-With'] = 'XMLHttpRequest' - headers['X-Instagram-GIS'] = hashlib.md5(f'{rhxGis}:{variables}'.encode('utf-8')).hexdigest() r = self._get(f'https://www.instagram.com/graphql/query/?query_hash={self._queryHash}&variables={variables}', headers = headers, responseOkCallback = self._check_json_callback) if r.status_code != 200: @@ -139,7 +196,7 @@ def get_items(self): response = r._snscrape_json_obj if not response['data'][self._responseContainer][self._edgeXToMedia]['edges']: return - yield from self._response_to_items(response['data']) + yield from self._response_to_items(response['data'][self._responseContainer]) if not response['data'][self._responseContainer][self._edgeXToMedia]['page_info']['has_next_page']: return endCursor = response['data'][self._responseContainer][self._edgeXToMedia]['page_info']['end_cursor'] @@ -157,6 +214,7 @@ def __init__(self, username, **kwargs): self._pageIDKey = 'id' self._queryHash = 'f2405b236d85e8296cf30347c9f08c2a' self._variablesFormat = '{{"id":"{pageID}","first":50,"after":"{endCursor}"}}' + self._apiUrl = f'https://www.instagram.com/api/v1/users/web_profile_info/?username={username}' def _get_entity(self): r = self._initial_page() @@ -212,6 +270,7 @@ def __init__(self, hashtag, **kwargs): self._pageIDKey = 'name' self._queryHash = 'f92f56d47dc7a55b606908374b43a314' self._variablesFormat = '{{"tag_name":"{pageID}","first":50,"after":"{endCursor}"}}' + self._apiUrl = f'https://www.instagram.com/api/v1/tags/logged_out_web_info/?tag_name={hashtag.lower()}' @classmethod def _cli_setup_parser(cls, subparser): @@ -229,11 +288,13 @@ def __init__(self, locationId, **kwargs): super().__init__(**kwargs) self._initialUrl = f'https://www.instagram.com/explore/locations/{locationId}/' self._pageName = 'LocationsPage' - self._responseContainer = 'location' + self._responseContainer = 'recent' self._edgeXToMedia = 'edge_location_to_media' - self._pageIDKey = 'id' + self._pageIDKey = 'next_page' self._queryHash = '1b84447a4d8b6d6d0426fefb34514485' self._variablesFormat = '{{"id":"{pageID}","first":50,"after":"{endCursor}"}}' + self._apiUrl = f"https://www.instagram.com/api/v1/locations/web_info/?location_id={locationId}" + self._locationId = locationId @classmethod def _cli_setup_parser(cls, subparser): @@ -242,3 +303,43 @@ def _cli_setup_parser(cls, subparser): @classmethod def _cli_from_args(cls, args): return cls._cli_construct(args, args.locationid) + + def get_items(self): + r = self._initial_page() + if r.status_code == 404: + _logger.warning('Page does not exist') + return + response = r._snscrape_json_obj + if len(response['native_location_data'][self._responseContainer]['sections']) == 0: + _logger.info('Page has no posts') + return + yield from self._response_to_items(response['native_location_data'][self._responseContainer]) + # querying for more data returns the login page, so 1 set of images is all we get + + def _response_to_items(self, response): + for node in response['sections']: + for media in node['layout_content']['medias']: + code = media['media']['code'] + username = media['media']['user']['username'] if 'username' in media['media']['user'] else None + url = f'https://www.instagram.com/p/{code}/' + + medium = Photo(media['media']['image_versions2']['candidates'][-1]['url'], media['media']['image_versions2']['candidates'][0]['url']) + if 'video_versions' in media['media']: + variants = [] + for version in media['media']['video_versions']: + variants.append(VideoVariant(url = version['url'], width = version['width'], height = version['height'])) + + medium = Video(thumbnailUrl = media['media']['image_versions2']['candidates'][-1]['url'], variants = variants, duration = int(media['media']['video_duration']) if 'video_duration' in media['media'] else None, views = media['media']['play_count'] if 'play_count' in media['media'] else None) + + yield InstagramPost( + url = url, + date = datetime.datetime.fromtimestamp(media['media']['taken_at'], datetime.timezone.utc), + content = media['media']['caption']['text'] if media['media']['caption'] else None, + username = username, + likes = media['media']['like_count'], + comments = media['media']['comment_count'], + commentsDisabled = False, + isVideo = True if 'video_versions' in media['media'] else False, + medium = medium, + id = media['media']['id'], + )