Skip to content
This repository has been archived by the owner on Jul 5, 2024. It is now read-only.

Commit

Permalink
Bring back task groups and backport taskgroups from 3.11.4
Browse files Browse the repository at this point in the history
  • Loading branch information
Jules-WinnfieldX committed Dec 16, 2023
1 parent f669907 commit ae07a83
Show file tree
Hide file tree
Showing 34 changed files with 454 additions and 349 deletions.
2 changes: 1 addition & 1 deletion cyberdrop_dl/__init__.py
Original file line number Diff line number Diff line change
@@ -1 +1 @@
__version__ = "5.0.123"
__version__ = "5.1.1"
70 changes: 26 additions & 44 deletions cyberdrop_dl/downloader/downloader.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
from cyberdrop_dl.utils.utilities import CustomHTTPStatus, FILE_FORMATS, log

if TYPE_CHECKING:
from asyncio import Queue
from typing import Tuple

from cyberdrop_dl.clients.download_client import DownloadClient
Expand Down Expand Up @@ -84,63 +83,46 @@ def __init__(self, manager: Manager, domain: str):
self.manager: Manager = manager
self.domain: str = domain

self.complete = True

self.client: DownloadClient = field(init=False)
self.download_queue: Queue = field(init=False)

self._file_lock = manager.download_manager.file_lock
self._additional_headers = {}

self._unfinished_count = 0
self._current_attempt_filesize = {}
self._semaphore: asyncio.Semaphore = field(init=False)

self._lock = asyncio.Lock()
self._additional_headers = {}

self.processed_items: list = []
self.waiting_items = 0
self._current_attempt_filesize = {}

async def startup(self) -> None:
"""Starts the downloader"""
self.download_queue = await self.manager.queue_manager.get_download_queue(self.domain)
self.client = self.manager.client_manager.downloader_session
await self.set_additional_headers()
self._semaphore = asyncio.Semaphore(await self.manager.download_manager.get_download_limit(self.domain))

async def run_loop(self) -> None:
async def run(self, media_item: MediaItem) -> None:
"""Runs the download loop"""
while True:
media_item: MediaItem = await self.download_queue.get()
self.complete = False
self._unfinished_count += 1
media_item.current_attempt = 0

await self._lock.acquire()
if not (media_item.url.path in self.processed_items):
self.processed_items.append(media_item.url.path)
self._lock.release()
await self.manager.progress_manager.download_progress.update_total()

await log(f"Download Starting: {media_item.url}")
async with self.manager.client_manager.download_session_limit:
try:
await self.download(media_item)
except Exception as e:
await log(f"Download Failed: {media_item.url} with error {e}")
await log(traceback.format_exc())
await self.manager.progress_manager.download_stats_progress.add_failure("Unknown")
await self.manager.progress_manager.download_progress.add_failed()
self._unfinished_count -= 1
self.download_queue.task_done()
if self._unfinished_count == 0 and self.download_queue.empty():
self.complete = True
continue

self.waiting_items += 1
media_item.current_attempt = 0

await self._semaphore.acquire()
self.waiting_items -= 1
if not (media_item.url.path in self.processed_items):
self.processed_items.append(media_item.url.path)
await self.manager.progress_manager.download_progress.update_total()

await log(f"Download Starting: {media_item.url}")
async with self.manager.client_manager.download_session_limit:
try:
await self.download(media_item)
except Exception as e:
await log(f"Download Failed: {media_item.url} with error {e}")
await log(traceback.format_exc())
await self.manager.progress_manager.download_stats_progress.add_failure("Unknown")
await self.manager.progress_manager.download_progress.add_failed()
else:
await log(f"Download Finished: {media_item.url}")
else:
self._lock.release()
self.download_queue.task_done()
self._unfinished_count -= 1
if self._unfinished_count == 0 and self.download_queue.empty():
self.complete = True
self._semaphore.release()

"""~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~"""

Expand Down
40 changes: 18 additions & 22 deletions cyberdrop_dl/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
from cyberdrop_dl.managers.manager import Manager
from cyberdrop_dl.scraper.scraper import ScrapeMapper
from cyberdrop_dl.ui.ui import program_ui
from cyberdrop_dl.utils.backports.taskgroups import TaskGroup
from cyberdrop_dl.utils.sorting import Sorter
from cyberdrop_dl.utils.utilities import check_latest_pypi, log_with_color, check_partials_and_empty_folders, log

Expand Down Expand Up @@ -38,21 +39,11 @@ def startup() -> Manager:
async def runtime(manager: Manager) -> None:
"""Main runtime loop for the program, this will run until all scraping and downloading is complete"""
scrape_mapper = ScrapeMapper(manager)
download_manager = manager.download_manager
asyncio.create_task(scrape_mapper.map_urls())

if not manager.args_manager.retry:
await scrape_mapper.load_links()
else:
await scrape_mapper.load_failed_links()

# Check completion
await asyncio.sleep(1)
while True:
scraper_complete = await scrape_mapper.check_complete()
downloader_complete = await download_manager.check_complete()
if scraper_complete and downloader_complete:
break
# NEW CODE
async with TaskGroup() as task_group:
manager.task_group = task_group
await scrape_mapper.start()


async def director(manager: Manager) -> None:
Expand Down Expand Up @@ -91,9 +82,6 @@ async def director(manager: Manager) -> None:
try:
with Live(manager.progress_manager.layout, refresh_per_second=10):
await runtime(manager)
except (KeyboardInterrupt, SystemExit):
print("\nExiting...")
exit(1)
except Exception as e:
print("\nAn error occurred, please report this to the developer")
print(e)
Expand Down Expand Up @@ -131,11 +119,19 @@ async def director(manager: Manager) -> None:
def main():
manager = startup()

with contextlib.suppress(RuntimeError, asyncio.CancelledError):
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
asyncio.run(director(manager))
sys.exit(0)
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
with contextlib.suppress(RuntimeError):
try:
asyncio.run(director(manager))
except KeyboardInterrupt:
print("\nTrying to Exit...")
try:
asyncio.run(manager.close())
except Exception:
pass
exit(1)
sys.exit(0)


if __name__ == '__main__':
Expand Down
3 changes: 2 additions & 1 deletion cyberdrop_dl/managers/client_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@
import certifi
from aiohttp import ClientResponse
from aiolimiter import AsyncLimiter
from multidict import CIMultiDictProxy
from yarl import URL

from cyberdrop_dl.clients.download_client import DownloadClient
Expand Down Expand Up @@ -41,6 +40,8 @@ def __init__(self, manager: Manager):
self.domain_rate_limits = {
"bunkrr": AsyncLimiter(5, 1),
"cyberdrop": AsyncLimiter(5, 1),
"coomer": AsyncLimiter(10, 1),
"kemono": AsyncLimiter(10, 1),
"pixeldrain": AsyncLimiter(10, 1),
"other": AsyncLimiter(25, 1)
}
Expand Down
36 changes: 1 addition & 35 deletions cyberdrop_dl/managers/download_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
from base64 import b64encode
from typing import TYPE_CHECKING

from cyberdrop_dl.downloader.downloader import Downloader
from cyberdrop_dl.utils.utilities import FILE_FORMATS

if TYPE_CHECKING:
Expand Down Expand Up @@ -40,33 +39,10 @@ class DownloadManager:
def __init__(self, manager: Manager):
self.manager = manager
self._download_instances: Dict = {}
self._download_instance_tasks: Dict = {}

self.file_lock = FileLock()

self.download_limits = {'bunkr': 1, 'bunkrr': 1, 'cyberdrop': 1, 'coomer': 8, 'cyberfile': 2, 'kemono': 8, "pixeldrain": 2}

async def check_complete(self) -> bool:
"""Checks if all download instances are complete"""
if not self._download_instances:
return True

keys = list(self._download_instances.keys())
for key in keys:
await self._download_instances[key].download_queue.join()

await asyncio.sleep(1)
keys = list(self._download_instances.keys())
for key in keys:
if not self._download_instances[key].download_queue.empty() or not self._download_instances[key].complete:
return False
return True

async def close(self) -> None:
"""Closes all download instances"""
for downloader in self._download_instance_tasks.values():
for task in downloader:
task.cancel()
self.download_limits = {'bunkr': 1, 'bunkrr': 1, 'cyberdrop': 1, 'coomer': 2, 'cyberfile': 2, 'kemono': 2, "pixeldrain": 2}

async def get_download_limit(self, key: str) -> int:
"""Returns the download limit for a domain"""
Expand All @@ -79,16 +55,6 @@ async def get_download_limit(self, key: str) -> int:
instances = self.manager.config_manager.global_settings_data['Rate_Limiting_Options']['max_simultaneous_downloads_per_domain']
return instances

async def get_download_instance(self, key: str) -> Downloader:
"""Returns a download instance"""
if key not in self._download_instances:
self._download_instances[key] = Downloader(self.manager, key)
await self._download_instances[key].startup()
self._download_instance_tasks[key] = []
for i in range(await self.get_download_limit(key)):
self._download_instance_tasks[key].append(asyncio.create_task(self._download_instances[key].run_loop()))
return self._download_instances[key]

async def basic_auth(self, username, password) -> str:
"""Returns a basic auth token"""
token = b64encode(f"{username}:{password}".encode('utf-8')).decode("ascii")
Expand Down
9 changes: 5 additions & 4 deletions cyberdrop_dl/managers/manager.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
import copy
import json
from dataclasses import field
from pathlib import Path

from cyberdrop_dl import __version__
from cyberdrop_dl.managers.args_manager import ArgsManager
Expand All @@ -13,8 +12,8 @@
from cyberdrop_dl.managers.log_manager import LogManager
from cyberdrop_dl.managers.path_manager import PathManager
from cyberdrop_dl.managers.progress_manager import ProgressManager
from cyberdrop_dl.managers.queue_manager import QueueManager
from cyberdrop_dl.utils.args import config_definitions
from cyberdrop_dl.utils.backports.taskgroups import TaskGroup
from cyberdrop_dl.utils.dataclasses.supported_domains import SupportedDomains
from cyberdrop_dl.utils.transfer.first_time_setup import TransitionManager
from cyberdrop_dl.utils.utilities import log
Expand All @@ -27,7 +26,6 @@ def __init__(self):
self.path_manager: PathManager = field(init=False)
self.config_manager: ConfigManager = field(init=False)
self.log_manager: LogManager = field(init=False)
self.queue_manager: QueueManager = QueueManager(self)
self.db_manager: DBManager = field(init=False)
self.client_manager: ClientManager = field(init=False)
self.download_manager: DownloadManager = field(init=False)
Expand All @@ -38,6 +36,10 @@ def __init__(self):
self._loaded_args_config: bool = False
self._made_portable: bool = False

self.task_group: TaskGroup = field(init=False)
self.task_list: list = []
self.scrape_mapper = field(init=False)

def startup(self) -> None:
"""Startup process for the manager"""
self.args_startup()
Expand Down Expand Up @@ -167,4 +169,3 @@ async def args_logging(self) -> None:
async def close(self) -> None:
"""Closes the manager"""
await self.db_manager.close()
await self.download_manager.close()
Loading

0 comments on commit ae07a83

Please sign in to comment.