Skip to content

Commit

Permalink
chore: modularize downloader even more
Browse files Browse the repository at this point in the history
  • Loading branch information
Gaisberg authored and Gaisberg committed Aug 30, 2024
1 parent 909783f commit 67227a3
Show file tree
Hide file tree
Showing 4 changed files with 33 additions and 33 deletions.
44 changes: 21 additions & 23 deletions src/program/downloaders/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,18 +28,19 @@ def validate(self):

def run(self, item: MediaItem):
logger.debug(f"Running downloader for {item.log_string}")
cached_stream = self.is_cached(item)
if cached_stream:
item.active_stream = cached_stream
needed_media = get_needed_media(item)
hashes = [stream.infohash for stream in item.streams if stream.infohash not in self.service.existing_hashes]
cached_streams = self.get_cached_streams(hashes, needed_media)
if cached_streams:
item.active_stream = cached_streams[0]
try:
torrent_id = self.service.download_cached(item)
torrent_names = self.service.get_torrent_names(torrent_id)
update_item_attributes(item, torrent_names)
logger.log("DEBRID", f"Downloaded {item.log_string}")
self.download(item, item.active_stream)
except Exception as e:
logger.error(f"Failed to download {item.log_string}: {e}")
self._delete_and_reset_active_stream(item)
else:
for stream in item.streams:
item.blacklist_stream(stream)
logger.log("DEBRID", f"No cached torrents found for {item.log_string}")
yield item

Expand All @@ -50,17 +51,11 @@ def _delete_and_reset_active_stream(self, item: MediaItem):
item.active_stream = {}
item.blacklist_stream(stream)

def is_cached(self, item: MediaItem) -> dict:
needed_media = get_needed_media(item)
hashes = [stream.infohash for stream in item.streams]
# Avoid duplicate torrents
for hash in hashes:
if hash in self.service.existing_hashes:
hashes.remove(hash)
def get_cached_streams(self, hashes: list[str], needed_media, break_on_first = True) -> dict:
chunks = [hashes[i:i + 5] for i in range(0, len(hashes), 5)]
# Using a list to share the state, booleans are immutable
break_pointer = [False]
result = {}
break_pointer = [False, break_on_first]
results = []

with ThreadPoolExecutor(thread_name_prefix="Dowloader") as executor:
futures = []
Expand All @@ -74,15 +69,18 @@ def is_cached(self, item: MediaItem) -> dict:
except CancelledError:
continue
if isinstance(_result, dict):
result = _result
for future in futures:
future.cancel()
results.append(_result)
if break_on_first:
for future in futures:
future.cancel()

if not result.get("infohash", False):
for stream in item.streams:
item.blacklist_stream(stream)
return results

return result
def download(self, item, active_stream: dict) -> str:
torrent_id = self.service.download_cached(active_stream)
torrent_names = self.service.get_torrent_names(torrent_id)
update_item_attributes(item, torrent_names)
logger.log("DEBRID", f"Downloaded {item.log_string}")

def update_item_attributes(item: MediaItem, names: tuple[str, str]):
""" Update the item attributes with the downloaded files and active stream """
Expand Down
19 changes: 10 additions & 9 deletions src/program/downloaders/realdebrid.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,22 +71,22 @@ def process_hashes(self, chunk: list[str], needed_media: dict, break_pointer: li
for infohash, container in cached_containers.items():
if container.get("matched_files"):
return {"infohash": infohash, **container}
return break_pointer[0]
return {}

def download_cached(self, item: MediaItem) -> str:
torrent_id = add_torrent(item.active_stream.get("infohash"))
def download_cached(self, active_stream: dict) -> str:
torrent_id = add_torrent(active_stream.get("infohash"))
if torrent_id:
self.existing_hashes.append(item.active_stream.get("infohash"))
select_files(torrent_id, [file for file in item.active_stream.get("all_files").keys()])
self.existing_hashes.append(active_stream.get("infohash"))
select_files(torrent_id, [file for file in active_stream.get("all_files").keys()])
return torrent_id
raise Exception("Failed to download torrent.")

def get_cached_containers(self, infohashes: list[str], needed_media: dict, break_pointer: list[bool] = [False]) -> dict:
def get_cached_containers(self, infohashes: list[str], needed_media: dict, break_pointer: list[bool]) -> dict:
cached_containers = {}
response = get_instant_availability(infohashes)

for infohash in infohashes:
if break_pointer[0]:
if break_pointer[1] and break_pointer[0]:
break
data = response.get(infohash, {})
if isinstance(data, list):
Expand All @@ -107,13 +107,14 @@ def all_files_valid(file_dict: dict) -> bool:
# Sort the container to have the longest length first
containers.sort(key=lambda x: len(x), reverse=True)
for container in containers:
if break_pointer[0]:
if break_pointer[1] and break_pointer[0]:
break
if all_files_valid(container):
cached_containers[infohash] = self.file_finder.get_cached_container(needed_media, break_pointer, container)
if cached_containers[infohash]:
break_pointer[0] = True
break
if break_pointer[1]:
break
return cached_containers

def get_torrent_names(self, id: str) -> dict:
Expand Down
2 changes: 1 addition & 1 deletion src/program/downloaders/shared.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ def cache_matches(self, cached_files: dict, needed_media: dict[int, list[int]],
matches_dict = {}

for file in cached_files.values():
if break_pointer[0]:
if break_pointer[1] and break_pointer[0]:
break
matched_season, matched_episodes = self.filename_matches_show(file[self.filename_attr])
if matched_season and matched_episodes:
Expand Down
1 change: 1 addition & 0 deletions src/program/media/item.py
Original file line number Diff line number Diff line change
Expand Up @@ -319,6 +319,7 @@ def reset(self, soft_reset: bool = False):
for episode in self.episodes:
episode._reset(soft_reset)
self._reset(soft_reset)
self.store_state()

def _reset(self, soft_reset):
"""Reset item attributes for rescraping."""
Expand Down

0 comments on commit 67227a3

Please sign in to comment.