Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix for Instagram #1001

Open
wants to merge 7 commits into
base: master
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
167 changes: 134 additions & 33 deletions snscrape/modules/instagram.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,14 @@
__all__ = ['InstagramPost', 'User', 'InstagramUserScraper', 'InstagramHashtagScraper', 'InstagramLocationScraper']
__all__ = [
'InstagramPost',
'User',
'Medium',
'Photo',
'VideoVariant',
'Video',
'InstagramUserScraper',
'InstagramHashtagScraper',
'InstagramLocationScraper'
]


import dataclasses
Expand All @@ -15,18 +25,49 @@
_logger = logging.getLogger(__name__)


class Medium:
pass


@dataclasses.dataclass
class Photo(Medium):
thumbnailUrl: str
fullUrl: str
altText: typing.Optional[str] = None


@dataclasses.dataclass
class VideoVariant:
url: str
width: int
height: int
contentType: typing.Optional[str] = None


@dataclasses.dataclass
class Video(Medium):
thumbnailUrl: str
variants: typing.List[VideoVariant]
duration: typing.Optional[float] = None
views: typing.Optional[int] = None
altText: typing.Optional[str] = None


@dataclasses.dataclass
class InstagramPost(snscrape.base.Item):
url: str
date: datetime.datetime
content: typing.Optional[str]
thumbnailUrl: str
displayUrl: str
username: typing.Optional[str]
likes: int
comments: int
commentsDisabled: bool
isVideo: bool
medium: typing.Union['Photo', 'Video']
id: str

thumbnailUrl = snscrape.base._DeprecatedProperty('thumbnailUrl', lambda self: self.medium.thumbnailUrl, 'medium.thumbnailUrl')
displayUrl = snscrape.base._DeprecatedProperty('displayUrl', lambda self: None if self.isVideo else self.medium.fullUrl, 'medium.fullUrl')

def __str__(self):
return self.url
Expand All @@ -53,24 +94,32 @@ def __init__(self, **kwargs):
super().__init__(**kwargs)
self._headers = {'User-Agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_10_1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/39.0.2171.95 Safari/537.36'}
self._initialPage = None
self._apiUrl = None

def _response_to_items(self, response):
for node in response[self._responseContainer][self._edgeXToMedia]['edges']:
for node in response[self._edgeXToMedia]['edges']:
code = node['node']['shortcode']
username = node['node']['owner']['username'] if 'username' in node['node']['owner'] else None
url = f'https://www.instagram.com/p/{code}/'

medium = Photo(node['node']['thumbnail_src'], node['node']['display_url'])
if node['node']['is_video']:
variants = [
VideoVariant(url = node['node']['video_url'], width = node['node']['dimensions']['width'], height = node['node']['dimensions']['height'])
]
medium = Video(thumbnailUrl = node['node']['thumbnail_src'], variants = variants, duration = int(node['node']['video_duration']) if 'video_duration' in node['node'] else None, views = node['node']['video_view_count'])
yield InstagramPost(
url = url,
date = datetime.datetime.fromtimestamp(node['node']['taken_at_timestamp'], datetime.timezone.utc),
content = node['node']['edge_media_to_caption']['edges'][0]['node']['text'] if len(node['node']['edge_media_to_caption']['edges']) else None,
thumbnailUrl = node['node']['thumbnail_src'],
displayUrl = node['node']['display_url'],
username = username,
likes = node['node']['edge_media_preview_like']['count'],
comments = node['node']['edge_media_to_comment']['count'],
commentsDisabled = node['node']['comments_disabled'],
isVideo = node['node']['is_video'],
)
url = url,
date = datetime.datetime.fromtimestamp(node['node']['taken_at_timestamp'], datetime.timezone.utc),
content = node['node']['edge_media_to_caption']['edges'][0]['node']['text'] if len(node['node']['edge_media_to_caption']['edges']) else None,
username = username,
likes = node['node']['edge_media_preview_like']['count'],
comments = node['node']['edge_media_to_comment']['count'],
commentsDisabled = node['node']['comments_disabled'],
isVideo = node['node']['is_video'],
medium = medium,
id = node['node']['id'],
)

def _initial_page(self):
if self._initialPage is None:
Expand All @@ -80,18 +129,29 @@ def _initial_page(self):
raise snscrape.base.ScraperException(f'Got status code {r.status_code}')
elif r.url.startswith('https://www.instagram.com/accounts/login/'):
raise snscrape.base.ScraperException('Redirected to login page')
r = self._get(
self._apiUrl,
headers = self._headers,
responseOkCallback = self._check_json_callback
)
self._initialPage = r

return self._initialPage

def _check_initial_page_callback(self, r):
if r.status_code != 200:
return True, None
jsonData = r.text.split('<script type="text/javascript">window._sharedData = ')[1].split(';</script>')[0] # May throw an IndexError if Instagram changes something again; we just let that bubble.
try:
obj = json.loads(jsonData)
except json.JSONDecodeError:
return False, 'invalid JSON'
r._snscrape_json_obj = obj
if (match := re.search(
r'\\"csrf_token\\":\\"([\da-zA-Z]+)\\",',
r.text)):
_logger.debug('Found csrf token in HTML')
self._headers['X-Csrftoken'] = match.group(1)
if (match := re.search(
r'"X-IG-App-ID":"(\d+)"',
r.text)):
_logger.debug('Found X-IG-App-ID token in HTML')
self._headers['X-IG-App-ID'] = match.group(1)

return True, None

def _check_json_callback(self, r):
Expand All @@ -112,25 +172,22 @@ def get_items(self):
_logger.warning('Page does not exist')
return
response = r._snscrape_json_obj
rhxGis = response['rhx_gis'] if 'rhx_gis' in response else ''
if response['entry_data'][self._pageName][0]['graphql'][self._responseContainer][self._edgeXToMedia]['count'] == 0:
if response['data'][self._responseContainer][self._edgeXToMedia]['count'] == 0:
_logger.info('Page has no posts')
return
if not response['entry_data'][self._pageName][0]['graphql'][self._responseContainer][self._edgeXToMedia]['edges']:
if not response['data'][self._responseContainer][self._edgeXToMedia]['edges']:
_logger.warning('Private account')
return
pageID = response['entry_data'][self._pageName][0]['graphql'][self._responseContainer][self._pageIDKey]
yield from self._response_to_items(response['entry_data'][self._pageName][0]['graphql'])
if not response['entry_data'][self._pageName][0]['graphql'][self._responseContainer][self._edgeXToMedia]['page_info']['has_next_page']:
pageID = response['data'][self._responseContainer][self._pageIDKey]
yield from self._response_to_items(response['data'][self._responseContainer])
if not response['data'][self._responseContainer][self._edgeXToMedia]['page_info']['has_next_page']:
return
endCursor = response['entry_data'][self._pageName][0]['graphql'][self._responseContainer][self._edgeXToMedia]['page_info']['end_cursor']
endCursor = response['data'][self._responseContainer][self._edgeXToMedia]['page_info']['end_cursor']

headers = self._headers.copy()
while True:
_logger.info(f'Retrieving endCursor = {endCursor!r}')
variables = self._variablesFormat.format(**locals())
headers['X-Requested-With'] = 'XMLHttpRequest'
headers['X-Instagram-GIS'] = hashlib.md5(f'{rhxGis}:{variables}'.encode('utf-8')).hexdigest()
r = self._get(f'https://www.instagram.com/graphql/query/?query_hash={self._queryHash}&variables={variables}', headers = headers, responseOkCallback = self._check_json_callback)

if r.status_code != 200:
Expand All @@ -139,7 +196,7 @@ def get_items(self):
response = r._snscrape_json_obj
if not response['data'][self._responseContainer][self._edgeXToMedia]['edges']:
return
yield from self._response_to_items(response['data'])
yield from self._response_to_items(response['data'][self._responseContainer])
if not response['data'][self._responseContainer][self._edgeXToMedia]['page_info']['has_next_page']:
return
endCursor = response['data'][self._responseContainer][self._edgeXToMedia]['page_info']['end_cursor']
Expand All @@ -157,6 +214,7 @@ def __init__(self, username, **kwargs):
self._pageIDKey = 'id'
self._queryHash = 'f2405b236d85e8296cf30347c9f08c2a'
self._variablesFormat = '{{"id":"{pageID}","first":50,"after":"{endCursor}"}}'
self._apiUrl = f'https://www.instagram.com/api/v1/users/web_profile_info/?username={username}'

def _get_entity(self):
r = self._initial_page()
Expand Down Expand Up @@ -212,6 +270,7 @@ def __init__(self, hashtag, **kwargs):
self._pageIDKey = 'name'
self._queryHash = 'f92f56d47dc7a55b606908374b43a314'
self._variablesFormat = '{{"tag_name":"{pageID}","first":50,"after":"{endCursor}"}}'
self._apiUrl = f'https://www.instagram.com/api/v1/tags/logged_out_web_info/?tag_name={hashtag.lower()}'

@classmethod
def _cli_setup_parser(cls, subparser):
Expand All @@ -229,11 +288,13 @@ def __init__(self, locationId, **kwargs):
super().__init__(**kwargs)
self._initialUrl = f'https://www.instagram.com/explore/locations/{locationId}/'
self._pageName = 'LocationsPage'
self._responseContainer = 'location'
self._responseContainer = 'recent'
self._edgeXToMedia = 'edge_location_to_media'
self._pageIDKey = 'id'
self._pageIDKey = 'next_page'
self._queryHash = '1b84447a4d8b6d6d0426fefb34514485'
self._variablesFormat = '{{"id":"{pageID}","first":50,"after":"{endCursor}"}}'
self._apiUrl = f"https://www.instagram.com/api/v1/locations/web_info/?location_id={locationId}"
self._locationId = locationId

@classmethod
def _cli_setup_parser(cls, subparser):
Expand All @@ -242,3 +303,43 @@ def _cli_setup_parser(cls, subparser):
@classmethod
def _cli_from_args(cls, args):
return cls._cli_construct(args, args.locationid)

def get_items(self):
r = self._initial_page()
if r.status_code == 404:
_logger.warning('Page does not exist')
return
response = r._snscrape_json_obj
if len(response['native_location_data'][self._responseContainer]['sections']) == 0:
_logger.info('Page has no posts')
return
yield from self._response_to_items(response['native_location_data'][self._responseContainer])
# querying for more data returns the login page, so 1 set of images is all we get

def _response_to_items(self, response):
for node in response['sections']:
for media in node['layout_content']['medias']:
code = media['media']['code']
username = media['media']['user']['username'] if 'username' in media['media']['user'] else None
url = f'https://www.instagram.com/p/{code}/'

medium = Photo(media['media']['image_versions2']['candidates'][-1]['url'], media['media']['image_versions2']['candidates'][0]['url'])
if 'video_versions' in media['media']:
variants = []
for version in media['media']['video_versions']:
variants.append(VideoVariant(url = version['url'], width = version['width'], height = version['height']))

medium = Video(thumbnailUrl = media['media']['image_versions2']['candidates'][-1]['url'], variants = variants, duration = int(media['media']['video_duration']) if 'video_duration' in media['media'] else None, views = media['media']['play_count'] if 'play_count' in media['media'] else None)

yield InstagramPost(
url = url,
date = datetime.datetime.fromtimestamp(media['media']['taken_at'], datetime.timezone.utc),
content = media['media']['caption']['text'] if media['media']['caption'] else None,
username = username,
likes = media['media']['like_count'],
comments = media['media']['comment_count'],
commentsDisabled = False,
isVideo = True if 'video_versions' in media['media'] else False,
medium = medium,
id = media['media']['id'],
)