Skip to content

Commit

Permalink
Update crunchyroll.py
Browse files Browse the repository at this point in the history
  • Loading branch information
Jules-A authored Apr 4, 2024
1 parent 8970996 commit ac4d246
Showing 1 changed file with 35 additions and 51 deletions.
86 changes: 35 additions & 51 deletions yt_dlp/extractor/crunchyroll.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import base64
import typing
import uuid

from .common import InfoExtractor
from ..networking.exceptions import HTTPError
Expand All @@ -8,7 +9,7 @@
float_or_none,
format_field,
int_or_none,
join_nonempty,
jwt_decode_hs256,
parse_age_limit,
parse_count,
parse_iso8601,
Expand All @@ -27,6 +28,7 @@ class CrunchyrollBaseIE(InfoExtractor):
_BASE_URL = 'https://www.crunchyroll.com'
_API_BASE = 'https://api.crunchyroll.com'
_NETRC_MACHINE = 'crunchyroll'
_ACCESS_TOKEN = None
_AUTH_HEADERS = None
_API_ENDPOINT = None
_BASIC_AUTH = None
Expand All @@ -48,6 +50,10 @@ class CrunchyrollBaseIE(InfoExtractor):
@property
def is_logged_in(self):
return bool(self._get_cookies(self._BASE_URL).get('etp_rt'))

@property
def is_premium(self):
return 'cr_premium' in jwt_decode_hs256(self._ACCESS_TOKEN).get('benefits', [])

def _perform_login(self, username, password):
if self.is_logged_in:
Expand Down Expand Up @@ -89,9 +95,13 @@ def _update_auth(self):

grant_type = 'etp_rt_cookie' if self.is_logged_in else 'client_id'
try:
auth_headers = {'Authorization': CrunchyrollBaseIE._BASIC_AUTH}
if not self.is_logged_in:
auth_headers['ETP-Anonymous-ID'] = uuid.uuid4()

auth_response = self._download_json(
f'{self._BASE_URL}/auth/v1/token', None, note=f'Authenticating with grant_type={grant_type}',
headers={'Authorization': CrunchyrollBaseIE._BASIC_AUTH}, data=f'grant_type={grant_type}'.encode())
headers=auth_headers, data=f'grant_type={grant_type}'.encode())
except ExtractorError as error:
if isinstance(error.cause, HTTPError) and error.cause.status == 403:
raise ExtractorError(
Expand Down Expand Up @@ -242,47 +252,30 @@ def get_meta_from_response(response):

return results

def _extract_formats(self, stream_response, display_id=None):
requested_formats = self._configuration_arg('format') or ['vo_adaptive_hls']
available_formats = {}
for stream_type, streams in traverse_obj(
stream_response, (('streams', ('data', 0)), {dict.items}, ...)):
if stream_type not in requested_formats:
continue
for stream in traverse_obj(streams, lambda _, v: v['url']):
hardsub_lang = stream.get('hardsub_locale') or ''
format_id = join_nonempty(stream_type, format_field(stream, 'hardsub_locale', 'hardsub-%s'))
available_formats[hardsub_lang] = (stream_type, format_id, hardsub_lang, stream['url'])
def _extract_stream(self, internal_id, is_music):
if is_music:
object_type = 'music/'
else:
object_type = ''

return self._download_json(
f'https://cr-play-service.prd.crunchyrollsvc.com/v1/{object_type}{internal_id}/console/switch/play',
internal_id, note='stream info', headers=CrunchyrollBaseIE._AUTH_HEADERS)

def _extract_formats(self, stream_response, display_id=None):
available_formats = {'': ('', '', stream_response['url'])}
for hardsub_lang, stream in traverse_obj(stream_response, ('hardSubs', {dict.items}, ...)):
available_formats[hardsub_lang] = (f'hardsub-{hardsub_lang}', hardsub_lang, stream['url'])

requested_hardsubs = [('' if val == 'none' else val) for val in (self._configuration_arg('hardsub') or ['none'])]
if '' in available_formats and 'all' not in requested_hardsubs:
full_format_langs = set(requested_hardsubs)
self.to_screen(
'To get all formats of a hardsub language, use '
'"--extractor-args crunchyrollbeta:hardsub=<language_code or all>". '
'See https://github.com/yt-dlp/yt-dlp#crunchyrollbeta-crunchyroll for more info',
only_once=True)
else:
full_format_langs = set(map(str.lower, available_formats))

audio_locale = traverse_obj(stream_response, ((None, 'meta'), 'audio_locale'), get_all=False)
audio_locale = stream_response.get('audioLocale')
hardsub_preference = qualities(requested_hardsubs[::-1])
formats = []
for stream_type, format_id, hardsub_lang, stream_url in available_formats.values():
if stream_type.endswith('hls'):
if hardsub_lang.lower() in full_format_langs:
adaptive_formats = self._extract_m3u8_formats(
stream_url, display_id, 'mp4', m3u8_id=format_id,
fatal=False, note=f'Downloading {format_id} HLS manifest')
else:
adaptive_formats = (self._m3u8_meta_format(stream_url, ext='mp4', m3u8_id=format_id),)
elif stream_type.endswith('dash'):
adaptive_formats = self._extract_mpd_formats(
stream_url, display_id, mpd_id=format_id,
fatal=False, note=f'Downloading {format_id} MPD manifest')
else:
self.report_warning(f'Encountered unknown stream_type: {stream_type!r}', display_id, only_once=True)
continue
for format_id, hardsub_lang, stream_url in available_formats.values():
adaptive_formats = self._extract_mpd_formats(
stream_url, display_id, mpd_id=format_id, headers=CrunchyrollBaseIE._AUTH_HEADERS,
fatal=False, note=f'Downloading {f"hardsub {hardsub_lang} " if hardsub_lang else ""}MPD manifest')
for f in adaptive_formats:
if f.get('acodec') != 'none':
f['language'] = audio_locale
Expand Down Expand Up @@ -529,18 +522,12 @@ def entries():
else:
raise ExtractorError(f'Unknown object type {object_type}')

# There might be multiple audio languages for one object (`<object>_metadata.versions`),
# so we need to get the id from `streams_link` instead or we dont know which language to choose
streams_link = response.get('streams_link')
if not streams_link and traverse_obj(response, (f'{object_type}_metadata', 'is_premium_only')):
if traverse_obj(response, (f'{object_type}_metadata', 'is_premium_only')) and not self.is_premium:
message = f'This {object_type} is for premium members only'
if self.is_logged_in:
raise ExtractorError(message, expected=True)
self.raise_login_required(message)

# We need go from unsigned to signed api to avoid getting soft banned
stream_response = self._call_cms_api_signed(remove_start(
streams_link, '/content/v2/cms/'), internal_id, lang, 'stream info')
stream_response = self._extract_stream(internal_id, False)
result['formats'] = self._extract_formats(stream_response, internal_id)
result['subtitles'] = self._extract_subtitles(stream_response)

Expand Down Expand Up @@ -760,15 +747,12 @@ def _real_extract(self, url):
if not response:
raise ExtractorError(f'No video with id {internal_id} could be found (possibly region locked?)', expected=True)

streams_link = response.get('streams_link')
if not streams_link and response.get('isPremiumOnly'):
if response.get('isPremiumOnly') and not self.is_premium:
message = f'This {response.get("type") or "media"} is for premium members only'
if self.is_logged_in:
raise ExtractorError(message, expected=True)
self.raise_login_required(message)

result = self._transform_music_response(response)
stream_response = self._call_api(streams_link, internal_id, lang, 'stream info')
stream_response = self._extract_stream(internal_id, True)
result['formats'] = self._extract_formats(stream_response, internal_id)

return result
Expand Down

0 comments on commit ac4d246

Please sign in to comment.