Skip to content

Commit

Permalink
upload_assets: refactor to something simpler and add initial tests (#5)
Browse files Browse the repository at this point in the history
  • Loading branch information
soxofaan committed Jul 26, 2024
1 parent 4990361 commit e3160f2
Show file tree
Hide file tree
Showing 3 changed files with 149 additions and 39 deletions.
77 changes: 38 additions & 39 deletions qa/tools/apex_algorithm_qa_tools/pytest_upload_assets.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,8 @@
```python
def test_dummy(upload_assets, tmp_path):
path = tmp_path / "dummy.txt"
path.write_text("dummy content")
path = tmp_path / "hello.txt"
path.write_text("Hello world.")
upload_assets(path)
```
Expand All @@ -33,7 +33,7 @@ def test_dummy(upload_assets, tmp_path):
import uuid
import warnings
from pathlib import Path
from typing import Callable, Dict, Union
from typing import Callable, Dict

import boto3
import pytest
Expand Down Expand Up @@ -80,6 +80,7 @@ def pytest_configure(config: pytest.Config):


def pytest_report_header(config):
# TODO Move inside S3UploadPlugin
plugin: S3UploadPlugin | None = config.pluginmanager.get_plugin(
_UPLOAD_ASSETS_PLUGIN_NAME
)
Expand All @@ -92,56 +93,54 @@ def pytest_unconfigure(config):
config.pluginmanager.unregister(name=_UPLOAD_ASSETS_PLUGIN_NAME)


class _Collector:
"""
Collects test outcomes and files to upload for a single test node.
"""

def __init__(self, nodeid: str) -> None:
self.nodeid = nodeid
self.outcomes: Dict[str, str] = {}
self.assets: Dict[str, Path] = {}

def set_outcome(self, when: str, outcome: str):
self.outcomes[when] = outcome

def collect(self, path: Path, name: str):
self.assets[name] = path


class S3UploadPlugin:
def __init__(self, *, run_id: str | None = None, s3_client, bucket: str) -> None:
self.run_id = run_id or uuid.uuid4().hex
self.collector: Union[_Collector, None] = None
self.collected_assets: Dict[str, Path] | None = None
self.s3_client = s3_client
self.bucket = bucket
self.upload_stats = {"uploaded": 0}

def pytest_runtest_logstart(self, nodeid, location):
self.collector = _Collector(nodeid=nodeid)

def pytest_runtest_logreport(self, report: pytest.TestReport):
self.collector.set_outcome(when=report.when, outcome=report.outcome)

def pytest_runtest_logfinish(self, nodeid, location):
# TODO: option to also upload on success?
if self.collector.outcomes.get("call") == "failed":
self._upload(self.collector)
def collect(self, path: Path, name: str):
"""Collect assets to upload"""
assert self.collected_assets is not None, "No active collection of assets"
self.collected_assets[name] = path

self.collector = None
def pytest_runtest_logstart(self, nodeid):
# Start new collection of assets for current test node
self.collected_assets = {}

def _upload(self, collector: _Collector):
for name, path in collector.assets.items():
nodeid = re.sub(r"[^a-zA-Z0-9_.-]", "_", collector.nodeid)
key = f"{self.run_id}!{nodeid}!{name}"
# TODO: get upload info in report?
_log.info(f"Uploading {path} to {self.bucket}/{key}")
def pytest_runtest_logreport(self, report: pytest.TestReport):
# TODO: option to upload on other outcome as well?
if report.when == "call" and report.outcome == "failed":
# TODO: what to do when upload fails?
uploaded = self._upload(nodeid=report.nodeid)
# TODO: report the uploaded assets somewhere (e.g. in user_properties or JSON report?)

def pytest_runtest_logfinish(self, nodeid):
# Reset collection of assets
self.collected_assets = None

def _upload(self, nodeid: str) -> Dict[str, str]:
assets = {}
for name, path in self.collected_assets.items():
safe_nodeid = re.sub(r"[^a-zA-Z0-9_.-]", "_", nodeid)
key = f"{self.run_id}!{safe_nodeid}!{name}"
# TODO: is this manual URL building correct and isn't there a boto utility for that?
url = f"{self.s3_client.meta.endpoint_url.rstrip('/')}/{self.bucket}/{key}"
_log.info(f"Uploading {path} to {url}")
self.s3_client.upload_file(
Filename=str(path),
Bucket=self.bucket,
Key=key,
# TODO: option to override ACL, or ExtraArgs in general?
ExtraArgs={"ACL": "public-read"},
)
assets[name] = url
self.upload_stats["uploaded"] += 1

def pytest_terminal_summary(self, terminalreporter):
terminalreporter.write_sep("-", f"`upload_assets` stats: {self.upload_stats}")


@pytest.fixture
Expand All @@ -163,7 +162,7 @@ def collect(*paths: Path):
# (e.g. when test uses an `actual` folder for actual results)
assert path.is_relative_to(tmp_path)
name = str(path.relative_to(tmp_path))
uploader.collector.collect(path=path, name=name)
uploader.collect(path=path, name=name)
else:
warnings.warn("Fixture `upload_assets` is a no-op (incomplete set up).")

Expand Down
1 change: 1 addition & 0 deletions qa/unittests/requirements.txt
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
apex-algorithm-qa-tools
pytest>=8.2.0
moto[s3, server]
110 changes: 110 additions & 0 deletions qa/unittests/tests/test_pytest_upload_assets.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
import uuid

import boto3
import moto.server
import pytest


@pytest.fixture(scope="module")
def moto_server() -> str:
"""Fixture to run a mocked AWS server for testing."""
server = moto.server.ThreadedMotoServer()
server.start()
# TODO: avoid hardcoded port (5000)
yield "http://localhost:5000"
server.stop()


@pytest.fixture(autouse=True)
def aws_credentials(monkeypatch):
monkeypatch.setenv("AWS_ACCESS_KEY_ID", "test123")
monkeypatch.setenv("AWS_SECRET_ACCESS_KEY", "test456")


@pytest.fixture
def s3_client(moto_server):
return boto3.client("s3", endpoint_url=moto_server)


@pytest.fixture
def s3_bucket(s3_client) -> str:
# Unique bucket name for test isolation
bucket = f"test-bucket-{uuid.uuid4().hex}"
s3_client.create_bucket(Bucket=bucket)
return bucket


def test_basic_upload_on_fail(
pytester: pytest.Pytester, moto_server, s3_client, s3_bucket
):
pytester.makeconftest(
"""
pytest_plugins = [
"apex_algorithm_qa_tools.pytest_upload_assets",
]
"""
)
pytester.makepyfile(
test_file_maker="""
def test_fail_and_upload(upload_assets, tmp_path):
path = tmp_path / "hello.txt"
path.write_text("Hello world.")
upload_assets(path)
assert 3 == 5
"""
)

run_result = pytester.runpytest_subprocess(
"--upload-assets-run-id=test-run-123",
f"--upload-assets-endpoint-url={moto_server}",
f"--upload-assets-bucket={s3_bucket}",
)
run_result.stdout.re_match_lines(
[r"Plugin `upload_assets` is active, with upload to 'test-bucket-"]
)
run_result.assert_outcomes(failed=1)

object_listing = s3_client.list_objects(Bucket=s3_bucket)
assert len(object_listing["Contents"])
keys = [obj["Key"] for obj in object_listing["Contents"]]
expected_key = "test-run-123!test_file_maker.py__test_fail_and_upload!hello.txt"
assert keys == [expected_key]

actual = s3_client.get_object(Bucket=s3_bucket, Key=expected_key)
assert actual["Body"].read().decode("utf8") == "Hello world."

run_result.stdout.re_match_lines([r".*`upload_assets` stats: \{'uploaded': 1\}"])


def test_nop_on_success(pytester: pytest.Pytester, moto_server, s3_client, s3_bucket):
pytester.makeconftest(
"""
pytest_plugins = [
"apex_algorithm_qa_tools.pytest_upload_assets",
]
"""
)
pytester.makepyfile(
test_file_maker="""
def test_success(upload_assets, tmp_path):
path = tmp_path / "hello.txt"
path.write_text("Hello world.")
upload_assets(path)
assert 3 == 3
"""
)

run_result = pytester.runpytest_subprocess(
"--upload-assets-run-id=test-run-123",
f"--upload-assets-endpoint-url={moto_server}",
f"--upload-assets-bucket={s3_bucket}",
)
run_result.stdout.re_match_lines(
[r"Plugin `upload_assets` is active, with upload to 'test-bucket-"]
)
run_result.assert_outcomes(passed=1)

object_listing = s3_client.list_objects(Bucket=s3_bucket)
assert object_listing.get("Contents", []) == []

run_result.stdout.re_match_lines([r".*`upload_assets` stats: \{'uploaded': 0\}"])

0 comments on commit e3160f2

Please sign in to comment.