Skip to content

Commit

Permalink
Add support for adding pull-through content to associated repositories
Browse files Browse the repository at this point in the history
fixes: pulp#6201
  • Loading branch information
gerrod3 committed Jan 23, 2025
1 parent 13114a8 commit b794a25
Show file tree
Hide file tree
Showing 4 changed files with 100 additions and 6 deletions.
5 changes: 5 additions & 0 deletions CHANGES/plugin_api/6201.feature
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
Added ability for plugins to dispatch a task to add pull-through content to an associated repository.

Add the class var `PULL_THROUGH_SUPPORTED = True` to the plugin's repository model to enable this
feature. Plugins can also customize the dispatched task by supplying their own
`pull_through_add_content` method on their repository model.
28 changes: 28 additions & 0 deletions pulpcore/app/models/repository.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ class Repository(MasterModel):
TYPE = "repository"
CONTENT_TYPES = []
REMOTE_TYPES = []
PULL_THROUGH_SUPPORTED = False

name = models.TextField(db_index=True)
pulp_labels = HStoreField(default=dict)
Expand Down Expand Up @@ -345,6 +346,33 @@ def protected_versions(self):

return qs.distinct()

def pull_through_add_content(self, content_artifact):
"""
Dispatch a task to add the passed in content_artifact from the content app's pull-through
feature to this repository.
Defaults to adding the associated content of the passed in content_artifact to the
repository. Plugins should overwrite this method if more complex behavior is necessary, i.e.
adding multiple associated content units in the same task.
Args:
content_artifact (pulpcore.app.models.ContentArtifact): the content artifact to add
Returns:
Optional(Task): Returns the dispatched task or None if nothing was done
"""
cpk = content_artifact.content_id
already_present = RepositoryContent.objects.filter(
content__pk=cpk, repository=self, version_removed__isnull=True
)
if not cpk or already_present.exists():
return None

from pulpcore.plugin.tasking import dispatch, add_and_remove

body = {"repository_pk": self.pk, "add_content_units": [cpk], "remove_content_units": []}
return dispatch(add_and_remove, kwargs=body, exclusive_resources=[self], immediate=True)

@hook(AFTER_UPDATE, when="retain_repo_versions", has_changed=True)
def _cleanup_old_versions_hook(self):
# Do not attempt to clean up anything, while there is a transaction involving repo versions
Expand Down
24 changes: 20 additions & 4 deletions pulpcore/content/handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -609,6 +609,7 @@ async def _match_and_stream(self, path, request):
repo_version = distro.repository_version

if repository:
repository = await repository.acast()
# Search for publication serving the latest (last complete) version
if not publication:
try:
Expand Down Expand Up @@ -759,8 +760,11 @@ async def _match_and_stream(self, path, request):
.filter(remote=remote, url=url)
.afirst()
):
# Try to stream the ContentArtifact if already created
ca = ra.content_artifact
# Try to add content to repository if present & supported
if repository and repository.PULL_THROUGH_SUPPORTED:
await sync_to_async(repository.pull_through_add_content)(ca)
# Try to stream the ContentArtifact if already created
if ca.artifact:
return await self._serve_content_artifact(ca, headers, request)
else:
Expand All @@ -780,6 +784,7 @@ async def _match_and_stream(self, path, request):
StreamResponse(headers=headers),
ra,
save_artifact=save_artifact,
repository=repository,
)
except ClientResponseError as ce:

Expand Down Expand Up @@ -934,15 +939,18 @@ def _save_artifact(self, download_result, remote_artifact, request=None):
# Now try to save RemoteArtifacts for each ContentArtifact
for ca in cas:
if url := remote.get_remote_artifact_url(ca.relative_path, request=request):
remote_artifact = RemoteArtifact(
ra = RemoteArtifact(
remote=remote, content_artifact=ca, url=url
)
try:
with transaction.atomic():
remote_artifact.save()
ra.save()
except IntegrityError:
# Remote artifact must have already been saved during a parallel request
log.info(f"RemoteArtifact for {url} already exists.")
if ca.relative_path == content_artifact.relative_path:
# Side effect used by pull-through-caching in _stream_remote_artifact
remote_artifact.content_artifact = ca

else:
# Normal on-demand downloading, update CA to point to new saved Artifact
Expand Down Expand Up @@ -1029,7 +1037,9 @@ def _build_url(**kwargs):
else:
raise NotImplementedError()

async def _stream_remote_artifact(self, request, response, remote_artifact, save_artifact=True):
async def _stream_remote_artifact(
self, request, response, remote_artifact, save_artifact=True, repository=None
):
"""
Stream and save a RemoteArtifact.
Expand All @@ -1039,6 +1049,8 @@ async def _stream_remote_artifact(self, request, response, remote_artifact, save
remote_artifact (pulpcore.plugin.models.RemoteArtifact) The RemoteArtifact
to fetch and then stream back to the client
save_artifact (bool): Override the save behavior on the streamed RemoteArtifact
repository (:class:`~pulpcore.plugin.models.Repository`): An optional repository to save
the content to if supported
Raises:
[aiohttp.web.HTTPNotFound][] when no
Expand Down Expand Up @@ -1176,6 +1188,10 @@ async def finalize():
await asyncio.shield(
sync_to_async(self._save_artifact)(download_result, remote_artifact, request)
)
# Try to add content to repository if present & supported
if repository and repository.PULL_THROUGH_SUPPORTED:
ca = remote_artifact.content_artifact
await sync_to_async(repository.pull_through_add_content)(ca)
await response.write_eof()

if response.status == 404:
Expand Down
49 changes: 47 additions & 2 deletions pulpcore/tests/unit/content/test_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
Distribution,
Remote,
RemoteArtifact,
Repository,
)


Expand Down Expand Up @@ -116,9 +117,15 @@ async def create_remote_artifact(remote, ca):
)


async def create_distribution(remote):
async def create_repository():
return await Repository.objects.acreate(name=str(uuid.uuid4()))


async def create_distribution(remote, repository=None):
name = str(uuid.uuid4())
return await Distribution.objects.acreate(name=name, base_path=name, remote=remote)
return await Distribution.objects.acreate(
name=name, base_path=name, remote=remote, repository=repository
)


@pytest.mark.asyncio
Expand Down Expand Up @@ -267,3 +274,41 @@ def content_init(art, path):
artifacts = set(ca.content._artifacts.all())
assert len(artifacts) == 2
assert {artifact, artifact123} == artifacts


@pytest.mark.asyncio
@pytest.mark.django_db
async def test_pull_through_repository_add(request123, monkeypatch):
"""Test that repository adding is called when supported."""
handler = Handler()
handler._stream_content_artifact = AsyncMock()

content = await create_content()
ca = await create_content_artifact(content)
remote = await create_remote()
await create_remote_artifact(remote, ca)
repo = await create_repository()
monkeypatch.setattr(Remote, "get_remote_artifact_content_type", Mock(return_value=Content))
monkeypatch.setattr(Repository, "pull_through_add_content", Mock())
distro = await create_distribution(remote, repository=repo)

try:
# Assert with Repository.PULL_THROUGH_SUPPORTED=False the method isn't called
await handler._match_and_stream(f"{distro.base_path}/c123", request123)
handler._stream_content_artifact.assert_called_once()
assert ca in handler._stream_content_artifact.call_args[0]
repo.pull_through_add_content.assert_not_called()

# Now set PULL_THROUGH_SUPPORTED=True and see the method is called with CA
monkeypatch.setattr(Repository, "PULL_THROUGH_SUPPORTED", True)
handler._stream_content_artifact.reset_mock()
await handler._match_and_stream(f"{distro.base_path}/c123", request123)
handler._stream_content_artifact.assert_called_once()
assert ca in handler._stream_content_artifact.call_args[0]
repo.pull_through_add_content.assert_called_once()
assert ca in repo.pull_through_add_content.call_args[0]
finally:
await content.adelete()
await repo.adelete()
await remote.adelete()
await distro.adelete()

0 comments on commit b794a25

Please sign in to comment.