diff --git a/yt_dlp/extractor/crunchyroll.py b/yt_dlp/extractor/crunchyroll.py index b52d4fe27ff0..095c04d3754d 100644 --- a/yt_dlp/extractor/crunchyroll.py +++ b/yt_dlp/extractor/crunchyroll.py @@ -1,5 +1,6 @@ import base64 import typing +import uuid from .common import InfoExtractor from ..networking.exceptions import HTTPError @@ -8,7 +9,7 @@ float_or_none, format_field, int_or_none, - join_nonempty, + jwt_decode_hs256, parse_age_limit, parse_count, parse_iso8601, @@ -27,6 +28,7 @@ class CrunchyrollBaseIE(InfoExtractor): _BASE_URL = 'https://www.crunchyroll.com' _API_BASE = 'https://api.crunchyroll.com' _NETRC_MACHINE = 'crunchyroll' + _ACCESS_TOKEN = None _AUTH_HEADERS = None _API_ENDPOINT = None _BASIC_AUTH = None @@ -48,6 +50,10 @@ class CrunchyrollBaseIE(InfoExtractor): @property def is_logged_in(self): return bool(self._get_cookies(self._BASE_URL).get('etp_rt')) + + @property + def is_premium(self): + return 'cr_premium' in jwt_decode_hs256(self._ACCESS_TOKEN).get('benefits', []) def _perform_login(self, username, password): if self.is_logged_in: @@ -89,9 +95,13 @@ def _update_auth(self): grant_type = 'etp_rt_cookie' if self.is_logged_in else 'client_id' try: + auth_headers = {'Authorization': CrunchyrollBaseIE._BASIC_AUTH} + if not self.is_logged_in: + auth_headers['ETP-Anonymous-ID'] = uuid.uuid4() + auth_response = self._download_json( f'{self._BASE_URL}/auth/v1/token', None, note=f'Authenticating with grant_type={grant_type}', - headers={'Authorization': CrunchyrollBaseIE._BASIC_AUTH}, data=f'grant_type={grant_type}'.encode()) + headers=auth_headers, data=f'grant_type={grant_type}'.encode()) except ExtractorError as error: if isinstance(error.cause, HTTPError) and error.cause.status == 403: raise ExtractorError( @@ -242,47 +252,30 @@ def get_meta_from_response(response): return results - def _extract_formats(self, stream_response, display_id=None): - requested_formats = self._configuration_arg('format') or ['vo_adaptive_hls'] - available_formats = {} - for stream_type, streams in traverse_obj( - stream_response, (('streams', ('data', 0)), {dict.items}, ...)): - if stream_type not in requested_formats: - continue - for stream in traverse_obj(streams, lambda _, v: v['url']): - hardsub_lang = stream.get('hardsub_locale') or '' - format_id = join_nonempty(stream_type, format_field(stream, 'hardsub_locale', 'hardsub-%s')) - available_formats[hardsub_lang] = (stream_type, format_id, hardsub_lang, stream['url']) + def _extract_stream(self, internal_id, is_music): + if is_music: + object_type = 'music/' + else: + object_type = '' + return self._download_json( + f'https://cr-play-service.prd.crunchyrollsvc.com/v1/{object_type}{internal_id}/console/switch/play', + internal_id, note='stream info', headers=CrunchyrollBaseIE._AUTH_HEADERS) + + def _extract_formats(self, stream_response, display_id=None): + available_formats = {'': ('', '', stream_response['url'])} + for hardsub_lang, stream in traverse_obj(stream_response, ('hardSubs', {dict.items}, ...)): + available_formats[hardsub_lang] = (f'hardsub-{hardsub_lang}', hardsub_lang, stream['url']) + requested_hardsubs = [('' if val == 'none' else val) for val in (self._configuration_arg('hardsub') or ['none'])] - if '' in available_formats and 'all' not in requested_hardsubs: - full_format_langs = set(requested_hardsubs) - self.to_screen( - 'To get all formats of a hardsub language, use ' - '"--extractor-args crunchyrollbeta:hardsub=". ' - 'See https://github.com/yt-dlp/yt-dlp#crunchyrollbeta-crunchyroll for more info', - only_once=True) - else: - full_format_langs = set(map(str.lower, available_formats)) - audio_locale = traverse_obj(stream_response, ((None, 'meta'), 'audio_locale'), get_all=False) + audio_locale = stream_response.get('audioLocale') hardsub_preference = qualities(requested_hardsubs[::-1]) formats = [] - for stream_type, format_id, hardsub_lang, stream_url in available_formats.values(): - if stream_type.endswith('hls'): - if hardsub_lang.lower() in full_format_langs: - adaptive_formats = self._extract_m3u8_formats( - stream_url, display_id, 'mp4', m3u8_id=format_id, - fatal=False, note=f'Downloading {format_id} HLS manifest') - else: - adaptive_formats = (self._m3u8_meta_format(stream_url, ext='mp4', m3u8_id=format_id),) - elif stream_type.endswith('dash'): - adaptive_formats = self._extract_mpd_formats( - stream_url, display_id, mpd_id=format_id, - fatal=False, note=f'Downloading {format_id} MPD manifest') - else: - self.report_warning(f'Encountered unknown stream_type: {stream_type!r}', display_id, only_once=True) - continue + for format_id, hardsub_lang, stream_url in available_formats.values(): + adaptive_formats = self._extract_mpd_formats( + stream_url, display_id, mpd_id=format_id, headers=CrunchyrollBaseIE._AUTH_HEADERS, + fatal=False, note=f'Downloading {f"hardsub {hardsub_lang} " if hardsub_lang else ""}MPD manifest') for f in adaptive_formats: if f.get('acodec') != 'none': f['language'] = audio_locale @@ -529,18 +522,12 @@ def entries(): else: raise ExtractorError(f'Unknown object type {object_type}') - # There might be multiple audio languages for one object (`_metadata.versions`), - # so we need to get the id from `streams_link` instead or we dont know which language to choose - streams_link = response.get('streams_link') - if not streams_link and traverse_obj(response, (f'{object_type}_metadata', 'is_premium_only')): + if traverse_obj(response, (f'{object_type}_metadata', 'is_premium_only')) and not self.is_premium: message = f'This {object_type} is for premium members only' - if self.is_logged_in: - raise ExtractorError(message, expected=True) self.raise_login_required(message) # We need go from unsigned to signed api to avoid getting soft banned - stream_response = self._call_cms_api_signed(remove_start( - streams_link, '/content/v2/cms/'), internal_id, lang, 'stream info') + stream_response = self._extract_stream(internal_id, False) result['formats'] = self._extract_formats(stream_response, internal_id) result['subtitles'] = self._extract_subtitles(stream_response) @@ -760,15 +747,12 @@ def _real_extract(self, url): if not response: raise ExtractorError(f'No video with id {internal_id} could be found (possibly region locked?)', expected=True) - streams_link = response.get('streams_link') - if not streams_link and response.get('isPremiumOnly'): + if response.get('isPremiumOnly') and not self.is_premium: message = f'This {response.get("type") or "media"} is for premium members only' - if self.is_logged_in: - raise ExtractorError(message, expected=True) self.raise_login_required(message) result = self._transform_music_response(response) - stream_response = self._call_api(streams_link, internal_id, lang, 'stream info') + stream_response = self._extract_stream(internal_id, True) result['formats'] = self._extract_formats(stream_response, internal_id) return result