Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Made TorrentDef.load async and threaded #7666

Merged
merged 1 commit into from
Nov 14, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions src/tribler/core/components/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,8 +69,8 @@ def mock_dlmgr(state_dir):


@pytest.fixture
def video_tdef():
return TorrentDef.load(TESTS_DATA_DIR / 'video.avi.torrent')
async def video_tdef():
return await TorrentDef.load(TESTS_DATA_DIR / 'video.avi.torrent')


@pytest.fixture
Expand All @@ -90,7 +90,7 @@ async def video_seeder(tmp_path_factory, video_tdef):
dlmgr.initialize()
dscfg_seed = DownloadConfig()
dscfg_seed.set_dest_dir(TESTS_DATA_DIR)
upload = dlmgr.start_download(tdef=video_tdef, config=dscfg_seed)
upload = await dlmgr.start_download(tdef=video_tdef, config=dscfg_seed)
await upload.wait_for_status(DownloadStatus.SEEDING)
yield dlmgr
await dlmgr.shutdown()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -252,7 +252,7 @@ async def download_channel(self, channel):
dcfg.set_channel_download(True)
tdef = TorrentDef(metainfo=metainfo)

download = self.download_manager.start_download(tdef=tdef, config=dcfg, hidden=True)
download = await self.download_manager.start_download(tdef=tdef, config=dcfg, hidden=True)
try:
await download.future_finished
except CancelledError:
Expand All @@ -279,7 +279,8 @@ def _process_download():
if updated_channel:
self.notifier[notifications.channel_entity_updated](channel_dict)

def updated_my_channel(self, tdef):
@task
async def updated_my_channel(self, tdef):
"""
Notify the core that we updated our channel.
"""
Expand All @@ -293,7 +294,7 @@ def updated_my_channel(self, tdef):
dcfg = DownloadConfig(state_dir=self.state_dir)
dcfg.set_dest_dir(self.mds.channels_dir)
dcfg.set_channel_download(True)
return self.download_manager.start_download(tdef=tdef, config=dcfg)
return await self.download_manager.start_download(tdef=tdef, config=dcfg)

@db_session
def clean_unsubscribed_channels(self):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,11 +30,11 @@ def torrent_template():


@pytest.fixture
def personal_channel(metadata_store):
async def personal_channel(metadata_store):
global update_metainfo
tdef = await TorrentDef.load(TORRENT_UBUNTU_FILE)
with db_session:
chan = metadata_store.ChannelMetadata.create_channel(title="my test chan", description="test")
tdef = TorrentDef.load(TORRENT_UBUNTU_FILE)
chan.add_torrent_to_channel(tdef, None)
update_metainfo = chan.commit_channel_torrent()
return chan
Expand Down Expand Up @@ -108,11 +108,11 @@ async def mock_remove_download(download_obj, **_):
gigachannel_manager.updated_my_channel.assert_called_once()


def test_updated_my_channel(personal_channel, gigachannel_manager, tmpdir):
async def test_updated_my_channel(personal_channel, gigachannel_manager, tmpdir):
tdef = TorrentDef.load_from_dict(update_metainfo)
gigachannel_manager.download_manager.start_download = MagicMock()
gigachannel_manager.download_manager.start_download = AsyncMock()
gigachannel_manager.download_manager.download_exists = lambda *_: False
gigachannel_manager.updated_my_channel(tdef)
await gigachannel_manager.updated_my_channel(tdef)
gigachannel_manager.download_manager.start_download.assert_called_once()


Expand Down Expand Up @@ -388,7 +388,7 @@ def mock_get_metainfo_good(*args, **kwargs):

initiated_download = False

def mock_download_from_tdef(*_, **__):
async def mock_download_from_tdef(*_, **__):
global initiated_download
initiated_download = True
mock_dl = MockObject()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
from shutil import rmtree
from typing import Callable, Dict, List, Optional

from ipv8.taskmanager import TaskManager, task
from ipv8.taskmanager import TaskManager

from tribler.core import notifications
from tribler.core.components.libtorrent.download_manager.dht_health_manager import DHTHealthManager
Expand Down Expand Up @@ -491,7 +491,7 @@ async def get_metainfo(self, infohash: bytes, timeout: float = 30, hops: Optiona
dcfg.set_upload_mode(True) # Upload mode should prevent libtorrent from creating files
dcfg.set_dest_dir(self.metadata_tmpdir)
try:
download = self.start_download(tdef=tdef, config=dcfg, hidden=True, checkpoint_disabled=True)
download = await self.start_download(tdef=tdef, config=dcfg, hidden=True, checkpoint_disabled=True)
except TypeError as e:
self._logger.warning(e)
if raise_errors:
Expand Down Expand Up @@ -550,7 +550,7 @@ async def start_download_from_uri(self, uri, config=None):

if scheme in (HTTP_SCHEME, HTTPS_SCHEME):
tdef = await TorrentDef.load_from_url(uri)
return self.start_download(tdef=tdef, config=config)
return await self.start_download(tdef=tdef, config=config)
if scheme == MAGNET_SCHEME:
name, infohash, _ = parse_magnetlink(uri)
if infohash is None:
Expand All @@ -559,14 +559,14 @@ async def start_download_from_uri(self, uri, config=None):
tdef = TorrentDef.load_from_dict(self.metainfo_cache[infohash]['meta_info'])
else:
tdef = TorrentDefNoMetainfo(infohash, "Unknown name" if name is None else name, url=uri)
return self.start_download(tdef=tdef, config=config)
return await self.start_download(tdef=tdef, config=config)
if scheme == FILE_SCHEME:
file = url_to_path(uri)
return self.start_download(torrent_file=file, config=config)
return await self.start_download(torrent_file=file, config=config)
raise Exception("invalid uri")

def start_download(self, torrent_file=None, tdef=None, config: DownloadConfig = None, checkpoint_disabled=False,
hidden=False) -> Download:
async def start_download(self, torrent_file=None, tdef=None, config: DownloadConfig = None,
checkpoint_disabled=False, hidden=False) -> Download:
self._logger.debug(f'Starting download: filename: {torrent_file}, torrent def: {tdef}')
if config is None:
config = DownloadConfig.from_defaults(self.download_defaults)
Expand All @@ -578,7 +578,7 @@ def start_download(self, torrent_file=None, tdef=None, config: DownloadConfig =
if torrent_file is None:
raise ValueError("Torrent file must be provided if tdef is not given")
# try to get the torrent from the given torrent file
tdef = TorrentDef.load(torrent_file)
tdef = await TorrentDef.load(torrent_file)

assert tdef is not None, "tdef MUST not be None after loading torrent"

Expand Down Expand Up @@ -617,10 +617,9 @@ def start_download(self, torrent_file=None, tdef=None, config: DownloadConfig =
if infohash not in self.metainfo_requests or self.metainfo_requests[infohash][0] == download:
self.downloads[infohash] = download
if not self.dummy_mode:
self.start_handle(download, atp)
await self.start_handle(download, atp)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note that the start_handle method is an async method that returns a Future object as a result. Here, we wait only for the start_handle method itself and not for the future it returns. If we want to wait for the future returned from the method, we should write await (await self.start_handle(download, atp)).

As I understand it, it is not necessary to wait for the future result here. But then, the question is, should we return the future from the start_handle method at all? Currently, we never await for the future result returned from the start_handle method, so it may be cleaner to just return None instead of the future.

return download

@task
async def start_handle(self, download, atp):
atp_resume_data_skipped = atp.copy()
resume_data = atp.get('resume_data')
Expand Down Expand Up @@ -662,7 +661,6 @@ async def start_handle(self, download, atp):
except asyncio.TimeoutError:
self._logger.warning("Timeout waiting for libtorrent DHT getting enough peers")
ltsession.async_add_torrent(encode_atp(atp))
return await download.future_added

def get_libtorrent_version(self):
try:
Expand Down Expand Up @@ -768,7 +766,7 @@ async def update_hops(self, download, new_hops):
# If the user wants to change the hop count to 0, don't automatically bump this up to 1 anymore
config.set_safe_seeding(False)

self.start_download(tdef=download.tdef, config=config)
await self.start_download(tdef=download.tdef, config=config)

def update_trackers(self, infohash, trackers):
""" Update the trackers for a download.
Expand Down Expand Up @@ -862,13 +860,13 @@ async def load_checkpoints(self):
checkpoint_filenames = list(self.get_checkpoint_dir().glob('*.conf'))
self.checkpoints_count = len(checkpoint_filenames)
for i, filename in enumerate(checkpoint_filenames, start=1):
self.load_checkpoint(filename)
await self.load_checkpoint(filename)
self.checkpoints_loaded = i
await sleep(.01)
self.all_checkpoints_are_loaded = True
self._logger.info("Checkpoints are loaded")

def load_checkpoint(self, filename):
async def load_checkpoint(self, filename):
try:
config = DownloadConfig.load(filename)
except Exception:
Expand Down Expand Up @@ -910,7 +908,7 @@ def load_checkpoint(self, filename):
if self.download_exists(tdef.get_infohash()):
self._logger.info("Not resuming checkpoint because download has already been added")
else:
self.start_download(tdef=tdef, config=config)
await self.start_download(tdef=tdef, config=config)
except Exception:
self._logger.exception("Not resume checkpoint due to exception while adding download")

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,6 @@ async def create_torrent(self, request):
download_config = DownloadConfig()
download_config.set_dest_dir(result['base_dir'])
download_config.set_hops(self.download_manager.download_defaults.number_hops)
self.download_manager.start_download(tdef=TorrentDef(metainfo_dict), config=download_config)
await self.download_manager.start_download(tdef=TorrentDef(metainfo_dict), config=download_config)

return RESTResponse(json.dumps({"torrent": base64.b64encode(result['metainfo']).decode('utf-8')}))
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ async def get_torrent_info(self, request):
if scheme == FILE_SCHEME:
file = url_to_path(uri)
try:
tdef = TorrentDef.load(file)
tdef = await TorrentDef.load(file)
metainfo = tdef.metainfo
except (FileNotFoundError, TypeError, ValueError, RuntimeError):
return RESTResponse({"error": f"error while decoding torrent file: {file}"},
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import asyncio
from asyncio import Future, gather, get_event_loop, sleep
from unittest.mock import MagicMock
from unittest.mock import MagicMock, AsyncMock

import pytest
from ipv8.util import succeed
Expand Down Expand Up @@ -65,7 +65,7 @@ async def test_get_metainfo_valid_metadata(fake_dlmgr):
download_impl.future_metainfo = succeed(metainfo)

fake_dlmgr.initialize()
fake_dlmgr.start_download = MagicMock(return_value=download_impl)
fake_dlmgr.start_download = AsyncMock(return_value=download_impl)
fake_dlmgr.download_defaults.number_hops = 1
fake_dlmgr.remove_download = MagicMock(return_value=succeed(None))

Expand All @@ -86,7 +86,7 @@ async def test_get_metainfo_add_fail(fake_dlmgr):
download_impl.tdef.get_metainfo = lambda: None

fake_dlmgr.initialize()
fake_dlmgr.start_download = MagicMock()
fake_dlmgr.start_download = AsyncMock()
fake_dlmgr.start_download.side_effect = TypeError
fake_dlmgr.download_defaults.number_hops = 1
fake_dlmgr.remove = MagicMock(return_value=succeed(None))
Expand All @@ -109,7 +109,7 @@ async def test_get_metainfo_duplicate_request(fake_dlmgr):
get_event_loop().call_later(0.1, download_impl.future_metainfo.set_result, metainfo)

fake_dlmgr.initialize()
fake_dlmgr.start_download = MagicMock(return_value=download_impl)
fake_dlmgr.start_download = AsyncMock(return_value=download_impl)
fake_dlmgr.download_defaults.number_hops = 1
fake_dlmgr.remove_download = MagicMock(return_value=succeed(None))

Expand All @@ -134,7 +134,7 @@ async def test_get_metainfo_with_already_added_torrent(fake_dlmgr):
Testing metainfo fetching for a torrent which is already in session.
"""
sample_torrent = TESTS_DATA_DIR / "bak_single.torrent"
torrent_def = TorrentDef.load(sample_torrent)
torrent_def = await TorrentDef.load(sample_torrent)

download_impl = MagicMock()
download_impl.future_metainfo = succeed(bencode(torrent_def.get_metainfo()))
Expand Down Expand Up @@ -164,10 +164,10 @@ async def test_start_download_while_getting_metainfo(fake_dlmgr):
fake_dlmgr.get_session = lambda *_: metainfo_session
fake_dlmgr.downloads[infohash] = metainfo_dl
fake_dlmgr.metainfo_requests[infohash] = [metainfo_dl, 1]
fake_dlmgr.remove_download = MagicMock(return_value=succeed(None))
fake_dlmgr.remove_download = AsyncMock(return_value=succeed(None))

tdef = TorrentDefNoMetainfo(infohash, 'name', f'magnet:?xt=urn:btih:{hexlify(infohash)}&')
download = fake_dlmgr.start_download(tdef=tdef, checkpoint_disabled=True)
download = await fake_dlmgr.start_download(tdef=tdef, checkpoint_disabled=True)
assert metainfo_dl != download
await sleep(.1)
assert fake_dlmgr.downloads[infohash] == download
Expand Down Expand Up @@ -199,7 +199,7 @@ async def test_start_download(fake_dlmgr):

fake_dlmgr.get_session = lambda *_: mock_ltsession

download = fake_dlmgr.start_download(tdef=TorrentDefNoMetainfo(infohash, ''), checkpoint_disabled=True)
download = await fake_dlmgr.start_download(tdef=TorrentDefNoMetainfo(infohash, ''), checkpoint_disabled=True)
handle = await download.get_handle()
assert handle == mock_handle
fake_dlmgr.downloads.clear()
Expand Down Expand Up @@ -249,14 +249,14 @@ async def test_start_download_existing_handle(fake_dlmgr):

fake_dlmgr.get_session = lambda *_: mock_ltsession

download = fake_dlmgr.start_download(tdef=TorrentDefNoMetainfo(infohash, 'name'), checkpoint_disabled=True)
download = await fake_dlmgr.start_download(tdef=TorrentDefNoMetainfo(infohash, 'name'), checkpoint_disabled=True)
handle = await download.get_handle()
assert handle == mock_handle
fake_dlmgr.downloads.clear()
await download.shutdown()


def test_start_download_existing_download(fake_dlmgr):
async def test_start_download_existing_download(fake_dlmgr):
"""
Testing the addition of a torrent to the libtorrent manager, if there is a pre-existing download.
"""
Expand All @@ -270,18 +270,18 @@ def test_start_download_existing_download(fake_dlmgr):
fake_dlmgr.downloads[infohash] = mock_download
fake_dlmgr.get_session = lambda *_: mock_ltsession

download = fake_dlmgr.start_download(tdef=TorrentDefNoMetainfo(infohash, 'name'), checkpoint_disabled=True)
download = await fake_dlmgr.start_download(tdef=TorrentDefNoMetainfo(infohash, 'name'), checkpoint_disabled=True)
assert download == mock_download
fake_dlmgr.downloads.clear()


def test_start_download_no_ti_url(fake_dlmgr):
async def test_start_download_no_ti_url(fake_dlmgr):
"""
Test whether a ValueError is raised if we try to add a torrent without infohash or url
"""
fake_dlmgr.initialize()
with pytest.raises(ValueError):
fake_dlmgr.start_download()
await fake_dlmgr.start_download()


def test_remove_unregistered_torrent(fake_dlmgr):
Expand Down Expand Up @@ -349,27 +349,27 @@ def test_post_session_stats(fake_dlmgr):
mock_lt_session.post_session_stats.assert_called_once()


def test_load_checkpoint(fake_dlmgr):
async def test_load_checkpoint(fake_dlmgr):
good = []

def mock_start_download(*_, **__):
async def mock_start_download(*_, **__):
good.append(1)

fake_dlmgr.start_download = mock_start_download

# Try opening real state file
state = TESTS_DATA_DIR / "config_files/13a25451c761b1482d3e85432f07c4be05ca8a56.conf"
fake_dlmgr.load_checkpoint(state)
await fake_dlmgr.load_checkpoint(state)
assert good

# Try opening nonexistent file
good = []
fake_dlmgr.load_checkpoint("nonexistent_file")
await fake_dlmgr.load_checkpoint("nonexistent_file")
assert not good

# Try opening corrupt file
config_file_path = TESTS_DATA_DIR / "config_files/corrupt_session_config.conf"
fake_dlmgr.load_checkpoint(config_file_path)
await fake_dlmgr.load_checkpoint(config_file_path)
assert not good


Expand All @@ -380,19 +380,19 @@ async def test_download_manager_start(fake_dlmgr):
assert fake_dlmgr.all_checkpoints_are_loaded


def test_load_empty_checkpoint(fake_dlmgr, tmpdir):
async def test_load_empty_checkpoint(fake_dlmgr, tmpdir):
"""
Test whether download resumes with faulty pstate file.
"""
fake_dlmgr.get_downloads_pstate_dir = lambda: tmpdir
fake_dlmgr.start_download = MagicMock()
fake_dlmgr.start_download = AsyncMock()

# Empty pstate file
pstate_filename = fake_dlmgr.get_downloads_pstate_dir() / 'abcd.state'
with open(pstate_filename, 'wb') as state_file:
state_file.write(b"")

fake_dlmgr.load_checkpoint(pstate_filename)
await fake_dlmgr.load_checkpoint(pstate_filename)
fake_dlmgr.start_download.assert_not_called()


Expand All @@ -401,7 +401,7 @@ async def test_load_checkpoints(fake_dlmgr, tmpdir):
Test whether we are resuming downloads after loading checkpoints
"""

def mocked_load_checkpoint(filename):
async def mocked_load_checkpoint(filename):
assert str(filename).endswith('abcd.conf')
mocked_load_checkpoint.called = True

Expand Down Expand Up @@ -442,8 +442,8 @@ async def mocked_update_hops(*_):
await readd_future


def test_get_downloads_by_name(fake_dlmgr):
dl = fake_dlmgr.start_download(torrent_file=TORRENT_UBUNTU_FILE, checkpoint_disabled=True)
async def test_get_downloads_by_name(fake_dlmgr):
dl = await fake_dlmgr.start_download(torrent_file=TORRENT_UBUNTU_FILE, checkpoint_disabled=True)
assert fake_dlmgr.get_downloads_by_name("ubuntu-15.04-desktop-amd64.iso")
assert not fake_dlmgr.get_downloads_by_name("ubuntu-15.04-desktop-amd64.iso", channels_only=True)
assert not fake_dlmgr.get_downloads_by_name("bla")
Expand Down
Loading