Skip to content

Commit

Permalink
Update crunchyroll.py
Browse files Browse the repository at this point in the history
  • Loading branch information
Jules-A authored Dec 30, 2023
1 parent 225cf2b commit a49d84a
Showing 1 changed file with 200 additions and 7 deletions.
207 changes: 200 additions & 7 deletions yt_dlp/extractor/crunchyroll.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import base64
import typing

from .common import InfoExtractor
from ..networking.exceptions import HTTPError
Expand All @@ -13,8 +14,10 @@
parse_iso8601,
qualities,
remove_start,
smuggle_url,
time_seconds,
traverse_obj,
unsmuggle_url,
url_or_none,
urlencode_postdata,
)
Expand Down Expand Up @@ -135,6 +138,110 @@ def _call_api(self, path, internal_id, lang, note='api', query={}):
raise ExtractorError(f'Unexpected response when downloading {note} JSON')
return result

def _get_requested_langs_from_extractor_args(self, ie_key=None):
return self._configuration_arg('language', ie_key=ie_key, casesense=False)

@staticmethod
def _format_audio_lang(audio_lang: str) -> str:
return audio_lang.casefold().strip()

@staticmethod
def _format_requested_langs(requested_langs):
# Requested Language options:
# 'default' = Include audio language of video with 'internal_id'
# 'unknown' = Include all unknown audio languages
# 'all' = Include all possible languages (will override all other options)
# <lang-code> = Include specific audio language e.g. 'ja-JP' or 'en-US'

# Format requested_langs
lang_formatter = CrunchyrollBaseIE._format_audio_lang
requested_langs = list(map(lang_formatter, requested_langs or ['default']))
if 'all' in requested_langs:
requested_langs = []
return requested_langs

@staticmethod
def _get_audio_langs_from_data(data):
lang_formatter = CrunchyrollBaseIE._format_audio_lang
wrap_in_list = lambda v: v and [v]

# Find audio languages in 'audio_locales' and 'audio_locale' and merge them into a case-folded set
audio_langs = set(traverse_obj(data, ('audio_locales', ..., {lang_formatter}), default=[]))
audio_langs.update(traverse_obj(data, ('audio_locale', {lang_formatter}, {wrap_in_list}), default=[]))
return audio_langs

@staticmethod
def _get_requested_lang_evaluator(requested_langs):
def is_requested_lang(audio_langs, allow_if_unknown=False):
audio_langs = audio_langs or {'unknown'}
# Is True if 'requested_langs' is empty ('all' languages were requested / are allowed)
# or if 'allow_if_unknown' is set and no language codes are specified in 'audio_langs'
# or if any of the specified languages in 'audio_langs' is in 'requested_langs'
return not requested_langs \
or (allow_if_unknown and 'unknown' in audio_langs) \
or any(la in requested_langs for la in audio_langs)
return is_requested_lang

@staticmethod
def _wrap_requested_lang_in_data_evaluator(requested_langs, is_requested_lang):
def is_requested_lang_in_data(data, allow_if_unknown=False):
# Is True if 'requested_langs' is empty ('all' languages were requested / are allowed)
# or if 'allow_if_unknown' is set and no language codes were found in 'data'
# or if any of the found languages from 'data' is in 'requested_langs'
return not requested_langs or is_requested_lang(
CrunchyrollBaseIE._get_audio_langs_from_data(data),
allow_if_unknown)
return is_requested_lang_in_data

@staticmethod
def _has_requested_default_lang(requested_langs):
return 'default' in requested_langs

def _get_responses_for_langs(self, internal_id, get_response_by_id, requested_langs):
requested_langs = self._format_requested_langs(requested_langs)
is_requested_lang = self._get_requested_lang_evaluator(requested_langs)
is_requested_lang_in_data = self._wrap_requested_lang_in_data_evaluator(
requested_langs, is_requested_lang)

def get_meta_from_response(response):
object_type = response.get('type')
return object_type and traverse_obj(response, f'{object_type}_metadata')

# Get default response and its metadata
default_response = get_response_by_id(internal_id)
if not default_response:
raise ExtractorError(
f'No video with id {internal_id} could be found (possibly region locked?)',
expected=True)
default_meta = get_meta_from_response(default_response)

# Result dict to store requested responses in
results = {}

# Add default response if requested
if self._has_requested_default_lang(requested_langs) \
or (default_meta and is_requested_lang_in_data(default_meta)):
results[internal_id] = default_response

# Iterate other versions and add them if requested
versions = (default_meta and traverse_obj(default_meta, 'versions')) or []
requested_versions = [version for version in versions
if version and is_requested_lang_in_data(version)]
for version in requested_versions:
version_id = version.get('guid')
if not version_id or version_id in results:
continue
# Fetch response for current version (will be on a different video page)
requested_response = get_response_by_id(version_id)
if not requested_response:
self.to_screen(
f'Requested video version with id {version_id} could not be found (possibly region locked?)',
only_once=True)
continue
results[version_id] = requested_response

return results

def _extract_formats(self, stream_response, display_id=None):
requested_formats = self._configuration_arg('format') or ['adaptive_hls']
available_formats = {}
Expand Down Expand Up @@ -192,6 +299,56 @@ def _extract_subtitles(self, data):

return subtitles

@staticmethod
def _extract_versions_and_merge_results(lang, internal_id, responses, extract_version):
# If only one language was requested, extract and return its version (no merge required)
if len(responses) == 1:
target_id, target_response = next(iter(responses.items()))
return extract_version(lang, target_id, target_response)

# If multiple languages were requested, extract all versions and merge them
# NOTE: Returned arguments such as 'title', 'season', 'season_id', etc. may differ
# from version to version. In each format, include what is different (compared to the
# main result). Choosing a format with '-f' now applies the correct arguments.
# ALSO: Differences in non-hashable arguments are not included in the formats.

# Extract main response from 'responses'. Favour the one with 'internal_id'
version_id, version_response = (internal_id, responses.pop(internal_id, None))
if not version_response:
# If 'internal_id' was excluded then use some other item. Its
# arguments are overridden when a format is selected anyway.
version_id, version_response = responses.popitem()

# Function to check whether an attribute (key value pair) is hashable
def is_attribute_hashable(attribute):
key, value = attribute
return isinstance(key, typing.Hashable) \
and isinstance(value, typing.Hashable)

# Extract main result (used to merge other results)
result = extract_version(lang, version_id, version_response)
result_formats = result.setdefault('formats', [])
result_subtitles = result.setdefault('subtitles', {})
result_as_set = set(filter(is_attribute_hashable, result.items()))

# Merge all formats and subtitles into main result
for version_id, version_response in responses.items():
version = extract_version(lang, version_id, version_response)
version_formats = version.get('formats') or []
version_subtitles = version.get('subtitles') or {}
version_as_set = set(filter(is_attribute_hashable, version.items()))

# Only add differences to every format
version_differences = dict(version_as_set - result_as_set)
for version_format in version_formats:
version_format.update(version_differences)

# Add version formats and subtitles to result
result_formats.extend(version_formats)
result_subtitles.update(version_subtitles)

# Return merged results
return result

class CrunchyrollCmsBaseIE(CrunchyrollBaseIE):
_API_ENDPOINT = 'cms'
Expand Down Expand Up @@ -327,14 +484,26 @@ class CrunchyrollBetaIE(CrunchyrollCmsBaseIE):
_RETURN_TYPE = 'video'

def _real_extract(self, url):
url, smuggled_data = unsmuggle_url(url, default={})
lang, internal_id = self._match_valid_url(url).group('lang', 'id')

# We need to use unsigned API call to allow ratings query string
response = traverse_obj(self._call_api(
f'objects/{internal_id}', internal_id, lang, 'object info', {'ratings': 'true'}), ('data', 0, {dict}))
if not response:
raise ExtractorError(f'No video with id {internal_id} could be found (possibly region locked?)', expected=True)

def get_response_by_id(requested_id):
# We need to use unsigned API call to allow ratings query string
return traverse_obj(self._call_api(
f'objects/{requested_id}', requested_id, lang, 'object info', {'ratings': 'true'}), ('data', 0, {dict}))

# Fetch requested language responses
requested_langs = traverse_obj(smuggled_data, ('target_audio_langs', {list}))
requested_langs = requested_langs or self._get_requested_langs_from_extractor_args()
responses = self._get_responses_for_langs(internal_id, get_response_by_id, requested_langs)
if not responses:
raise ExtractorError(
'None of the requested audio languages were found',
expected=True)
return self._extract_versions_and_merge_results(
lang, internal_id, responses, self._extract_version)

def _extract_version(self, lang, internal_id, response):
object_type = response.get('type')
if object_type == 'episode':
result = self._transform_episode_response(response)
Expand Down Expand Up @@ -474,14 +643,38 @@ class CrunchyrollBetaShowIE(CrunchyrollCmsBaseIE):
def _real_extract(self, url):
lang, internal_id = self._match_valid_url(url).group('lang', 'id')

# Fetch requested languages (for Crunchyroll 'single episode' extractor)
requested_langs = self._get_requested_langs_from_extractor_args(ie_key=CrunchyrollBetaIE.ie_key())
requested_langs = self._format_requested_langs(requested_langs)
if self._has_requested_default_lang(requested_langs):
# If default is specified, use the default behavior (i.e. all languages are extracted).
# An empty array means allow any language (the same as if 'all' were set).
requested_langs = []
is_requested_lang = self._get_requested_lang_evaluator(requested_langs)

def entries():
seasons_response = self._call_cms_api_signed(f'seasons?series_id={internal_id}', internal_id, lang, 'seasons')
for season in traverse_obj(seasons_response, ('items', ..., {dict})):
# Skip the extraction of a 'season' if its language is known, and it was not requested
# (If the language of 'season' is unknown then do not skip it -> Hence 'allow_if_unknown=True')
season_audio_langs = self._get_audio_langs_from_data(season)
if not is_requested_lang(season_audio_langs, allow_if_unknown=True):
continue

episodes_response = self._call_cms_api_signed(
f'episodes?season_id={season["id"]}', season["id"], lang, 'episode list')
for episode_response in traverse_obj(episodes_response, ('items', ..., {dict})):
# Skip the extraction of an episode if its language was not requested
episode_audio_langs = self._get_audio_langs_from_data(episode_response)
if not is_requested_lang(episode_audio_langs):
continue

smuggled_data = {
'target_audio_langs': list(episode_audio_langs) or ['default'],
}

yield self.url_result(
f'{self._BASE_URL}/{lang}watch/{episode_response["id"]}',
smuggle_url(f'{self._BASE_URL}/{lang}watch/{episode_response["id"]}', smuggled_data),
CrunchyrollBetaIE, **CrunchyrollBetaIE._transform_episode_response(episode_response))

return self.playlist_result(
Expand Down

0 comments on commit a49d84a

Please sign in to comment.