From 4e618d075bab0e3c2e057b345710cb651e9a4cd8 Mon Sep 17 00:00:00 2001 From: Brian May Date: Fri, 30 Jun 2023 20:49:35 -0700 Subject: [PATCH 1/7] Instagram is mostly working Just need date from location searches --- snscrape/modules/instagram.py | 128 ++++++++++++++++++++++++++-------- 1 file changed, 98 insertions(+), 30 deletions(-) diff --git a/snscrape/modules/instagram.py b/snscrape/modules/instagram.py index 4973333..60a3c87 100644 --- a/snscrape/modules/instagram.py +++ b/snscrape/modules/instagram.py @@ -27,6 +27,7 @@ class InstagramPost(snscrape.base.Item): comments: int commentsDisabled: bool isVideo: bool + videoUrl: typing.Optional[str] def __str__(self): return self.url @@ -53,24 +54,27 @@ def __init__(self, **kwargs): super().__init__(**kwargs) self._headers = {'User-Agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_10_1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/39.0.2171.95 Safari/537.36'} self._initialPage = None + self._api_url = None def _response_to_items(self, response): - for node in response[self._responseContainer][self._edgeXToMedia]['edges']: + for node in response[self._edgeXToMedia]['edges']: code = node['node']['shortcode'] username = node['node']['owner']['username'] if 'username' in node['node']['owner'] else None url = f'https://www.instagram.com/p/{code}/' + yield InstagramPost( - url = url, - date = datetime.datetime.fromtimestamp(node['node']['taken_at_timestamp'], datetime.timezone.utc), - content = node['node']['edge_media_to_caption']['edges'][0]['node']['text'] if len(node['node']['edge_media_to_caption']['edges']) else None, - thumbnailUrl = node['node']['thumbnail_src'], - displayUrl = node['node']['display_url'], - username = username, - likes = node['node']['edge_media_preview_like']['count'], - comments = node['node']['edge_media_to_comment']['count'], - commentsDisabled = node['node']['comments_disabled'], - isVideo = node['node']['is_video'], - ) + url=url, + date=datetime.datetime.fromtimestamp(node['node']['taken_at_timestamp'], datetime.timezone.utc), + content=node['node']['edge_media_to_caption']['edges'][0]['node']['text'] if len(node['node']['edge_media_to_caption']['edges']) else None, + thumbnailUrl=node['node']['thumbnail_src'], + displayUrl=node['node']['display_url'], + username=username, + likes=node['node']['edge_media_preview_like']['count'], + comments=node['node']['edge_media_to_comment']['count'], + commentsDisabled=node['node']['comments_disabled'], + isVideo=node['node']['is_video'], + videoUrl=node['node']['video_url'] if 'video_url' in node['node'] else None, + ) def _initial_page(self): if self._initialPage is None: @@ -80,18 +84,29 @@ def _initial_page(self): raise snscrape.base.ScraperException(f'Got status code {r.status_code}') elif r.url.startswith('https://www.instagram.com/accounts/login/'): raise snscrape.base.ScraperException('Redirected to login page') + r = self._get( + self._api_url, + headers=self._headers, + responseOkCallback=self._check_json_callback + ) self._initialPage = r + return self._initialPage def _check_initial_page_callback(self, r): if r.status_code != 200: return True, None - jsonData = r.text.split('')[0] # May throw an IndexError if Instagram changes something again; we just let that bubble. - try: - obj = json.loads(jsonData) - except json.JSONDecodeError: - return False, 'invalid JSON' - r._snscrape_json_obj = obj + if (match := re.search( + r'\\"csrf_token\\":\\"([\da-zA-Z]+)\\",', + r.text)): + _logger.debug('Found csrf token in HTML') + self._headers['X-Csrftoken'] = match.group(1) + if (match := re.search( + r'"X-IG-App-ID":"(\d+)"', + r.text)): + _logger.debug('Found X-IG-App-ID token in HTML') + self._headers['X-IG-App-ID'] = match.group(1) + return True, None def _check_json_callback(self, r): @@ -112,25 +127,22 @@ def get_items(self): _logger.warning('Page does not exist') return response = r._snscrape_json_obj - rhxGis = response['rhx_gis'] if 'rhx_gis' in response else '' - if response['entry_data'][self._pageName][0]['graphql'][self._responseContainer][self._edgeXToMedia]['count'] == 0: + if response['data'][self._responseContainer][self._edgeXToMedia]['count'] == 0: _logger.info('Page has no posts') return - if not response['entry_data'][self._pageName][0]['graphql'][self._responseContainer][self._edgeXToMedia]['edges']: + if not response['data'][self._responseContainer][self._edgeXToMedia]['edges']: _logger.warning('Private account') return - pageID = response['entry_data'][self._pageName][0]['graphql'][self._responseContainer][self._pageIDKey] - yield from self._response_to_items(response['entry_data'][self._pageName][0]['graphql']) - if not response['entry_data'][self._pageName][0]['graphql'][self._responseContainer][self._edgeXToMedia]['page_info']['has_next_page']: + pageID = response['data'][self._responseContainer][self._pageIDKey] + yield from self._response_to_items(response['data'][self._responseContainer]) + if not response['data'][self._responseContainer][self._edgeXToMedia]['page_info']['has_next_page']: return - endCursor = response['entry_data'][self._pageName][0]['graphql'][self._responseContainer][self._edgeXToMedia]['page_info']['end_cursor'] + endCursor = response['data'][self._responseContainer][self._edgeXToMedia]['page_info']['end_cursor'] headers = self._headers.copy() while True: _logger.info(f'Retrieving endCursor = {endCursor!r}') variables = self._variablesFormat.format(**locals()) - headers['X-Requested-With'] = 'XMLHttpRequest' - headers['X-Instagram-GIS'] = hashlib.md5(f'{rhxGis}:{variables}'.encode('utf-8')).hexdigest() r = self._get(f'https://www.instagram.com/graphql/query/?query_hash={self._queryHash}&variables={variables}', headers = headers, responseOkCallback = self._check_json_callback) if r.status_code != 200: @@ -139,7 +151,7 @@ def get_items(self): response = r._snscrape_json_obj if not response['data'][self._responseContainer][self._edgeXToMedia]['edges']: return - yield from self._response_to_items(response['data']) + yield from self._response_to_items(response['data'][self._responseContainer]) if not response['data'][self._responseContainer][self._edgeXToMedia]['page_info']['has_next_page']: return endCursor = response['data'][self._responseContainer][self._edgeXToMedia]['page_info']['end_cursor'] @@ -157,6 +169,7 @@ def __init__(self, username, **kwargs): self._pageIDKey = 'id' self._queryHash = 'f2405b236d85e8296cf30347c9f08c2a' self._variablesFormat = '{{"id":"{pageID}","first":50,"after":"{endCursor}"}}' + self._api_url = f'https://www.instagram.com/api/v1/users/web_profile_info/?username={username}' def _get_entity(self): r = self._initial_page() @@ -212,6 +225,7 @@ def __init__(self, hashtag, **kwargs): self._pageIDKey = 'name' self._queryHash = 'f92f56d47dc7a55b606908374b43a314' self._variablesFormat = '{{"tag_name":"{pageID}","first":50,"after":"{endCursor}"}}' + self._api_url = f'https://www.instagram.com/api/v1/tags/logged_out_web_info/?tag_name={hashtag.lower()}' @classmethod def _cli_setup_parser(cls, subparser): @@ -229,11 +243,12 @@ def __init__(self, locationId, **kwargs): super().__init__(**kwargs) self._initialUrl = f'https://www.instagram.com/explore/locations/{locationId}/' self._pageName = 'LocationsPage' - self._responseContainer = 'location' + self._responseContainer = 'recent' self._edgeXToMedia = 'edge_location_to_media' - self._pageIDKey = 'id' + self._pageIDKey = 'next_page' self._queryHash = '1b84447a4d8b6d6d0426fefb34514485' self._variablesFormat = '{{"id":"{pageID}","first":50,"after":"{endCursor}"}}' + self._api_url = f"https://www.instagram.com/api/v1/locations/web_info/?location_id={locationId}" @classmethod def _cli_setup_parser(cls, subparser): @@ -242,3 +257,56 @@ def _cli_setup_parser(cls, subparser): @classmethod def _cli_from_args(cls, args): return cls._cli_construct(args, args.locationid) + + def get_items(self): + r = self._initial_page() + if r.status_code == 404: + _logger.warning('Page does not exist') + return + response = r._snscrape_json_obj + if len(response['native_location_data'][self._responseContainer]['sections']) == 0: + _logger.info('Page has no posts') + return + pageID = response['native_location_data'][self._responseContainer][self._pageIDKey] + yield from self._response_to_items(response['native_location_data'][self._responseContainer]) + if not response['native_location_data'][self._responseContainer]['more_available']: + return + endCursor = response['native_location_data'][self._responseContainer]['next_max_id'] + + headers = self._headers.copy() + while True: + _logger.info(f'Retrieving endCursor = {endCursor!r}') + variables = self._variablesFormat.format(**locals()) + r = self._get(f'https://www.instagram.com/graphql/query/?query_hash={self._queryHash}&variables={variables}', headers = headers, responseOkCallback = self._check_json_callback) + + if r.status_code != 200: + raise snscrape.base.ScraperException(f'Got status code {r.status_code}') + + response = r._snscrape_json_obj + if not response['native_location_data'][self._responseContainer]: + return + yield from self._response_to_items(response['native_location_data'][self._responseContainer]) + if not response['native_location_data'][self._responseContainer]['more_available']: + return + endCursor = response['native_location_data'][self._responseContainer]['next_max_id'] + + def _response_to_items(self, response): + for node in response['sections']: + for media in node['layout_content']['medias']: + code = media['media']['code'] + username = media['media']['user']['username'] if 'username' in media['media']['user'] else None + url = f'https://www.instagram.com/p/{code}/' + + yield InstagramPost( + url=url, + date=None, # datetime.datetime.fromtimestamp(media['media']['device_timestamp'], datetime.timezone.utc), + content=media['media']['caption']['text'] if media['media']['caption'] else None, + thumbnailUrl=media['media']['image_versions2']['candidates'][-1]['url'], + displayUrl=media['media']['image_versions2']['candidates'][0]['url'], + username=username, + likes=media['media']['like_count'], + comments=media['media']['comment_count'], + commentsDisabled=False, + isVideo=True if 'video_versions' in media['media'] else False, + videoUrl=media['media']['video_versions'][0]['url'] if 'video_versions' in media['media'] else None, + ) \ No newline at end of file From c857f00699011eb3624d03332c3d5c2bf9584a99 Mon Sep 17 00:00:00 2001 From: Brian May Date: Fri, 30 Jun 2023 21:26:08 -0700 Subject: [PATCH 2/7] Location Data has date now but can only query 1 page of data --- snscrape/modules/instagram.py | 37 +++++++++++++++++------------------ 1 file changed, 18 insertions(+), 19 deletions(-) diff --git a/snscrape/modules/instagram.py b/snscrape/modules/instagram.py index 60a3c87..0338ca7 100644 --- a/snscrape/modules/instagram.py +++ b/snscrape/modules/instagram.py @@ -271,24 +271,23 @@ def get_items(self): yield from self._response_to_items(response['native_location_data'][self._responseContainer]) if not response['native_location_data'][self._responseContainer]['more_available']: return - endCursor = response['native_location_data'][self._responseContainer]['next_max_id'] - - headers = self._headers.copy() - while True: - _logger.info(f'Retrieving endCursor = {endCursor!r}') - variables = self._variablesFormat.format(**locals()) - r = self._get(f'https://www.instagram.com/graphql/query/?query_hash={self._queryHash}&variables={variables}', headers = headers, responseOkCallback = self._check_json_callback) - - if r.status_code != 200: - raise snscrape.base.ScraperException(f'Got status code {r.status_code}') - - response = r._snscrape_json_obj - if not response['native_location_data'][self._responseContainer]: - return - yield from self._response_to_items(response['native_location_data'][self._responseContainer]) - if not response['native_location_data'][self._responseContainer]['more_available']: - return - endCursor = response['native_location_data'][self._responseContainer]['next_max_id'] + # endCursor = response['native_location_data'][self._responseContainer]['next_max_id'] + # headers = self._headers.copy() + # while True: + # _logger.info(f'Retrieving endCursor = {endCursor!r}') + # variables = self._variablesFormat.format(**locals()) + # r = self._get(f'https://www.instagram.com/graphql/query/?query_hash={self._queryHash}&variables={variables}', headers = headers, responseOkCallback = self._check_json_callback) + # + # if r.status_code != 200: + # raise snscrape.base.ScraperException(f'Got status code {r.status_code}') + # + # response = r._snscrape_json_obj + # if not response['data']['location']: + # return + # yield from self._response_to_items(response['native_location_data'][self._responseContainer]) + # if not response['native_location_data'][self._responseContainer]['more_available']: + # return + # endCursor = response['native_location_data'][self._responseContainer]['next_max_id'] def _response_to_items(self, response): for node in response['sections']: @@ -299,7 +298,7 @@ def _response_to_items(self, response): yield InstagramPost( url=url, - date=None, # datetime.datetime.fromtimestamp(media['media']['device_timestamp'], datetime.timezone.utc), + date=datetime.datetime.fromtimestamp(media['media']['taken_at'], datetime.timezone.utc), content=media['media']['caption']['text'] if media['media']['caption'] else None, thumbnailUrl=media['media']['image_versions2']['candidates'][-1]['url'], displayUrl=media['media']['image_versions2']['candidates'][0]['url'], From 288835715da71e60afca2c5087028618f19cc503 Mon Sep 17 00:00:00 2001 From: Brian May Date: Fri, 30 Jun 2023 23:07:43 -0700 Subject: [PATCH 3/7] Location only gets 1 page of data tsia --- snscrape/modules/instagram.py | 26 +++++++++++++++++++++----- 1 file changed, 21 insertions(+), 5 deletions(-) diff --git a/snscrape/modules/instagram.py b/snscrape/modules/instagram.py index 0338ca7..d46f183 100644 --- a/snscrape/modules/instagram.py +++ b/snscrape/modules/instagram.py @@ -249,6 +249,7 @@ def __init__(self, locationId, **kwargs): self._queryHash = '1b84447a4d8b6d6d0426fefb34514485' self._variablesFormat = '{{"id":"{pageID}","first":50,"after":"{endCursor}"}}' self._api_url = f"https://www.instagram.com/api/v1/locations/web_info/?location_id={locationId}" + self._locationId = locationId @classmethod def _cli_setup_parser(cls, subparser): @@ -267,16 +268,31 @@ def get_items(self): if len(response['native_location_data'][self._responseContainer]['sections']) == 0: _logger.info('Page has no posts') return - pageID = response['native_location_data'][self._responseContainer][self._pageIDKey] + # pageID = response['native_location_data'][self._responseContainer][self._pageIDKey] yield from self._response_to_items(response['native_location_data'][self._responseContainer]) - if not response['native_location_data'][self._responseContainer]['more_available']: - return + + # querying for more data returns the login page, so 1 set of images is all we get + # if not response['native_location_data'][self._responseContainer]['more_available']: + # return # endCursor = response['native_location_data'][self._responseContainer]['next_max_id'] # headers = self._headers.copy() + # headers['X-Requested-With'] = 'XMLHttpRequest' + # # headers['X-Instagram-Ajax'] = 'XMLHttpRequest' # while True: # _logger.info(f'Retrieving endCursor = {endCursor!r}') - # variables = self._variablesFormat.format(**locals()) - # r = self._get(f'https://www.instagram.com/graphql/query/?query_hash={self._queryHash}&variables={variables}', headers = headers, responseOkCallback = self._check_json_callback) + # data = { + # 'surface': 'grid', + # 'tab': 'recent', + # 'max_id': endCursor, + # 'next_media_ids': [], + # 'page': pageID + # } + # r = self._post( + # f'https://www.instagram.com/api/v1/locations/{self._locationId}/sections/', + # headers=headers, + # data=data, + # responseOkCallback=self._check_json_callback + # ) # # if r.status_code != 200: # raise snscrape.base.ScraperException(f'Got status code {r.status_code}') From 3f01277156ec43f92f802a743b9252ba7841c15d Mon Sep 17 00:00:00 2001 From: Brian May Date: Fri, 30 Jun 2023 23:19:05 -0700 Subject: [PATCH 4/7] add id to InstagramPost class tsia --- snscrape/modules/instagram.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/snscrape/modules/instagram.py b/snscrape/modules/instagram.py index d46f183..2b178c2 100644 --- a/snscrape/modules/instagram.py +++ b/snscrape/modules/instagram.py @@ -28,6 +28,7 @@ class InstagramPost(snscrape.base.Item): commentsDisabled: bool isVideo: bool videoUrl: typing.Optional[str] + id: str def __str__(self): return self.url @@ -74,6 +75,7 @@ def _response_to_items(self, response): commentsDisabled=node['node']['comments_disabled'], isVideo=node['node']['is_video'], videoUrl=node['node']['video_url'] if 'video_url' in node['node'] else None, + id=node['node']['id'], ) def _initial_page(self): @@ -324,4 +326,5 @@ def _response_to_items(self, response): commentsDisabled=False, isVideo=True if 'video_versions' in media['media'] else False, videoUrl=media['media']['video_versions'][0]['url'] if 'video_versions' in media['media'] else None, - ) \ No newline at end of file + id=media['media']['id'], + ) From e693f97c38b53481d03dfcbb2be68fac9fb75433 Mon Sep 17 00:00:00 2001 From: Brian May Date: Sat, 1 Jul 2023 15:25:00 -0700 Subject: [PATCH 5/7] removed commented out code tsia --- snscrape/modules/instagram.py | 34 ---------------------------------- 1 file changed, 34 deletions(-) diff --git a/snscrape/modules/instagram.py b/snscrape/modules/instagram.py index 2b178c2..dde027d 100644 --- a/snscrape/modules/instagram.py +++ b/snscrape/modules/instagram.py @@ -270,42 +270,8 @@ def get_items(self): if len(response['native_location_data'][self._responseContainer]['sections']) == 0: _logger.info('Page has no posts') return - # pageID = response['native_location_data'][self._responseContainer][self._pageIDKey] yield from self._response_to_items(response['native_location_data'][self._responseContainer]) - # querying for more data returns the login page, so 1 set of images is all we get - # if not response['native_location_data'][self._responseContainer]['more_available']: - # return - # endCursor = response['native_location_data'][self._responseContainer]['next_max_id'] - # headers = self._headers.copy() - # headers['X-Requested-With'] = 'XMLHttpRequest' - # # headers['X-Instagram-Ajax'] = 'XMLHttpRequest' - # while True: - # _logger.info(f'Retrieving endCursor = {endCursor!r}') - # data = { - # 'surface': 'grid', - # 'tab': 'recent', - # 'max_id': endCursor, - # 'next_media_ids': [], - # 'page': pageID - # } - # r = self._post( - # f'https://www.instagram.com/api/v1/locations/{self._locationId}/sections/', - # headers=headers, - # data=data, - # responseOkCallback=self._check_json_callback - # ) - # - # if r.status_code != 200: - # raise snscrape.base.ScraperException(f'Got status code {r.status_code}') - # - # response = r._snscrape_json_obj - # if not response['data']['location']: - # return - # yield from self._response_to_items(response['native_location_data'][self._responseContainer]) - # if not response['native_location_data'][self._responseContainer]['more_available']: - # return - # endCursor = response['native_location_data'][self._responseContainer]['next_max_id'] def _response_to_items(self, response): for node in response['sections']: From 5e1656489ed2e21843ed02e5541c18f4bff17a8e Mon Sep 17 00:00:00 2001 From: Brian May Date: Sat, 1 Jul 2023 16:19:02 -0700 Subject: [PATCH 6/7] formatting changes to match repo style tsia --- snscrape/modules/instagram.py | 62 +++++++++++++++++------------------ 1 file changed, 31 insertions(+), 31 deletions(-) diff --git a/snscrape/modules/instagram.py b/snscrape/modules/instagram.py index dde027d..b72159c 100644 --- a/snscrape/modules/instagram.py +++ b/snscrape/modules/instagram.py @@ -55,7 +55,7 @@ def __init__(self, **kwargs): super().__init__(**kwargs) self._headers = {'User-Agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_10_1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/39.0.2171.95 Safari/537.36'} self._initialPage = None - self._api_url = None + self._apiUrl = None def _response_to_items(self, response): for node in response[self._edgeXToMedia]['edges']: @@ -64,18 +64,18 @@ def _response_to_items(self, response): url = f'https://www.instagram.com/p/{code}/' yield InstagramPost( - url=url, - date=datetime.datetime.fromtimestamp(node['node']['taken_at_timestamp'], datetime.timezone.utc), - content=node['node']['edge_media_to_caption']['edges'][0]['node']['text'] if len(node['node']['edge_media_to_caption']['edges']) else None, - thumbnailUrl=node['node']['thumbnail_src'], - displayUrl=node['node']['display_url'], - username=username, - likes=node['node']['edge_media_preview_like']['count'], - comments=node['node']['edge_media_to_comment']['count'], - commentsDisabled=node['node']['comments_disabled'], - isVideo=node['node']['is_video'], - videoUrl=node['node']['video_url'] if 'video_url' in node['node'] else None, - id=node['node']['id'], + url = url, + date = datetime.datetime.fromtimestamp(node['node']['taken_at_timestamp'], datetime.timezone.utc), + content = node['node']['edge_media_to_caption']['edges'][0]['node']['text'] if len(node['node']['edge_media_to_caption']['edges']) else None, + thumbnailUrl = node['node']['thumbnail_src'], + displayUrl = node['node']['display_url'], + username = username, + likes = node['node']['edge_media_preview_like']['count'], + comments = node['node']['edge_media_to_comment']['count'], + commentsDisabled = node['node']['comments_disabled'], + isVideo = node['node']['is_video'], + videoUrl = node['node']['video_url'] if 'video_url' in node['node'] else None, + id = node['node']['id'], ) def _initial_page(self): @@ -87,9 +87,9 @@ def _initial_page(self): elif r.url.startswith('https://www.instagram.com/accounts/login/'): raise snscrape.base.ScraperException('Redirected to login page') r = self._get( - self._api_url, - headers=self._headers, - responseOkCallback=self._check_json_callback + self._apiUrl, + headers = self._headers, + responseOkCallback = self._check_json_callback ) self._initialPage = r @@ -171,7 +171,7 @@ def __init__(self, username, **kwargs): self._pageIDKey = 'id' self._queryHash = 'f2405b236d85e8296cf30347c9f08c2a' self._variablesFormat = '{{"id":"{pageID}","first":50,"after":"{endCursor}"}}' - self._api_url = f'https://www.instagram.com/api/v1/users/web_profile_info/?username={username}' + self._apiUrl = f'https://www.instagram.com/api/v1/users/web_profile_info/?username={username}' def _get_entity(self): r = self._initial_page() @@ -227,7 +227,7 @@ def __init__(self, hashtag, **kwargs): self._pageIDKey = 'name' self._queryHash = 'f92f56d47dc7a55b606908374b43a314' self._variablesFormat = '{{"tag_name":"{pageID}","first":50,"after":"{endCursor}"}}' - self._api_url = f'https://www.instagram.com/api/v1/tags/logged_out_web_info/?tag_name={hashtag.lower()}' + self._apiUrl = f'https://www.instagram.com/api/v1/tags/logged_out_web_info/?tag_name={hashtag.lower()}' @classmethod def _cli_setup_parser(cls, subparser): @@ -250,7 +250,7 @@ def __init__(self, locationId, **kwargs): self._pageIDKey = 'next_page' self._queryHash = '1b84447a4d8b6d6d0426fefb34514485' self._variablesFormat = '{{"id":"{pageID}","first":50,"after":"{endCursor}"}}' - self._api_url = f"https://www.instagram.com/api/v1/locations/web_info/?location_id={locationId}" + self._apiUrl = f"https://www.instagram.com/api/v1/locations/web_info/?location_id={locationId}" self._locationId = locationId @classmethod @@ -281,16 +281,16 @@ def _response_to_items(self, response): url = f'https://www.instagram.com/p/{code}/' yield InstagramPost( - url=url, - date=datetime.datetime.fromtimestamp(media['media']['taken_at'], datetime.timezone.utc), - content=media['media']['caption']['text'] if media['media']['caption'] else None, - thumbnailUrl=media['media']['image_versions2']['candidates'][-1]['url'], - displayUrl=media['media']['image_versions2']['candidates'][0]['url'], - username=username, - likes=media['media']['like_count'], - comments=media['media']['comment_count'], - commentsDisabled=False, - isVideo=True if 'video_versions' in media['media'] else False, - videoUrl=media['media']['video_versions'][0]['url'] if 'video_versions' in media['media'] else None, - id=media['media']['id'], + url = url, + date = datetime.datetime.fromtimestamp(media['media']['taken_at'], datetime.timezone.utc), + content = media['media']['caption']['text'] if media['media']['caption'] else None, + thumbnailUrl = media['media']['image_versions2']['candidates'][-1]['url'], + displayUrl = media['media']['image_versions2']['candidates'][0]['url'], + username = username, + likes = media['media']['like_count'], + comments = media['media']['comment_count'], + commentsDisabled = False, + isVideo = True if 'video_versions' in media['media'] else False, + videoUrl = media['media']['video_versions'][0]['url'] if 'video_versions' in media['media'] else None, + id = media['media']['id'], ) From 68147f6eac7c6b8f54bb16b1078ed552cec0a06d Mon Sep 17 00:00:00 2001 From: Brian May Date: Wed, 5 Jul 2023 19:09:55 -0700 Subject: [PATCH 7/7] Add medium to InstagramPost add dataclasses Medium, Photo, Video and VideoVariant for Instagram as logged out users only get 1 media type per post this simplifies the class to use 1 property with different class values --- snscrape/modules/instagram.py | 69 ++++++++++++++++++++++++++++++----- 1 file changed, 59 insertions(+), 10 deletions(-) diff --git a/snscrape/modules/instagram.py b/snscrape/modules/instagram.py index b72159c..cc50e46 100644 --- a/snscrape/modules/instagram.py +++ b/snscrape/modules/instagram.py @@ -1,4 +1,14 @@ -__all__ = ['InstagramPost', 'User', 'InstagramUserScraper', 'InstagramHashtagScraper', 'InstagramLocationScraper'] +__all__ = [ + 'InstagramPost', + 'User', + 'Medium', + 'Photo', + 'VideoVariant', + 'Video', + 'InstagramUserScraper', + 'InstagramHashtagScraper', + 'InstagramLocationScraper' +] import dataclasses @@ -15,21 +25,50 @@ _logger = logging.getLogger(__name__) +class Medium: + pass + + +@dataclasses.dataclass +class Photo(Medium): + thumbnailUrl: str + fullUrl: str + altText: typing.Optional[str] = None + + +@dataclasses.dataclass +class VideoVariant: + url: str + width: int + height: int + contentType: typing.Optional[str] = None + + +@dataclasses.dataclass +class Video(Medium): + thumbnailUrl: str + variants: typing.List[VideoVariant] + duration: typing.Optional[float] = None + views: typing.Optional[int] = None + altText: typing.Optional[str] = None + + @dataclasses.dataclass class InstagramPost(snscrape.base.Item): url: str date: datetime.datetime content: typing.Optional[str] - thumbnailUrl: str - displayUrl: str username: typing.Optional[str] likes: int comments: int commentsDisabled: bool isVideo: bool - videoUrl: typing.Optional[str] + medium: typing.Union['Photo', 'Video'] id: str + thumbnailUrl = snscrape.base._DeprecatedProperty('thumbnailUrl', lambda self: self.medium.thumbnailUrl, 'medium.thumbnailUrl') + displayUrl = snscrape.base._DeprecatedProperty('displayUrl', lambda self: None if self.isVideo else self.medium.fullUrl, 'medium.fullUrl') + def __str__(self): return self.url @@ -63,18 +102,22 @@ def _response_to_items(self, response): username = node['node']['owner']['username'] if 'username' in node['node']['owner'] else None url = f'https://www.instagram.com/p/{code}/' + medium = Photo(node['node']['thumbnail_src'], node['node']['display_url']) + if node['node']['is_video']: + variants = [ + VideoVariant(url = node['node']['video_url'], width = node['node']['dimensions']['width'], height = node['node']['dimensions']['height']) + ] + medium = Video(thumbnailUrl = node['node']['thumbnail_src'], variants = variants, duration = int(node['node']['video_duration']) if 'video_duration' in node['node'] else None, views = node['node']['video_view_count']) yield InstagramPost( url = url, date = datetime.datetime.fromtimestamp(node['node']['taken_at_timestamp'], datetime.timezone.utc), content = node['node']['edge_media_to_caption']['edges'][0]['node']['text'] if len(node['node']['edge_media_to_caption']['edges']) else None, - thumbnailUrl = node['node']['thumbnail_src'], - displayUrl = node['node']['display_url'], username = username, likes = node['node']['edge_media_preview_like']['count'], comments = node['node']['edge_media_to_comment']['count'], commentsDisabled = node['node']['comments_disabled'], isVideo = node['node']['is_video'], - videoUrl = node['node']['video_url'] if 'video_url' in node['node'] else None, + medium = medium, id = node['node']['id'], ) @@ -280,17 +323,23 @@ def _response_to_items(self, response): username = media['media']['user']['username'] if 'username' in media['media']['user'] else None url = f'https://www.instagram.com/p/{code}/' + medium = Photo(media['media']['image_versions2']['candidates'][-1]['url'], media['media']['image_versions2']['candidates'][0]['url']) + if 'video_versions' in media['media']: + variants = [] + for version in media['media']['video_versions']: + variants.append(VideoVariant(url = version['url'], width = version['width'], height = version['height'])) + + medium = Video(thumbnailUrl = media['media']['image_versions2']['candidates'][-1]['url'], variants = variants, duration = int(media['media']['video_duration']) if 'video_duration' in media['media'] else None, views = media['media']['play_count'] if 'play_count' in media['media'] else None) + yield InstagramPost( url = url, date = datetime.datetime.fromtimestamp(media['media']['taken_at'], datetime.timezone.utc), content = media['media']['caption']['text'] if media['media']['caption'] else None, - thumbnailUrl = media['media']['image_versions2']['candidates'][-1]['url'], - displayUrl = media['media']['image_versions2']['candidates'][0]['url'], username = username, likes = media['media']['like_count'], comments = media['media']['comment_count'], commentsDisabled = False, isVideo = True if 'video_versions' in media['media'] else False, - videoUrl = media['media']['video_versions'][0]['url'] if 'video_versions' in media['media'] else None, + medium = medium, id = media['media']['id'], )