diff --git a/.github/workflows/tests.yml b/.github/workflows/tests.yml index 2d175b8..85a6232 100644 --- a/.github/workflows/tests.yml +++ b/.github/workflows/tests.yml @@ -28,6 +28,6 @@ jobs: - name: CLI tests run: cd cli && pip install -e .[dev] && cd tests && python -m unittest - name: API tests - run: cd api && pip install -e .[dev] && cd tests && python -m unittest + run: cd api && pip install -e .[dev] && pytest tests - name: Webapp tests run: cd webapp && npm i && npm run eslint && npm run test diff --git a/api/secsend_api/app.py b/api/secsend_api/app.py index 90f72ef..deef607 100644 --- a/api/secsend_api/app.py +++ b/api/secsend_api/app.py @@ -146,7 +146,7 @@ async def download(request, id_): ) @bp.post("/delete/") -async def upload_finish(request, id_): +async def delete_id(request, id_): rid = RootID.from_str(id_) fid = rid.file_id() f = get_backend(request).open(fid) @@ -208,9 +208,9 @@ def declare_app(enable_cors=False, backend_files_root=None, html_root=None, time if html_root is not None: app.ctx.html_root = html_root - app.static("/", os.path.join(html_root, "index.html")) - app.static("/", html_root) - app.static("/dl", os.path.join(html_root, "dl.html")) + app.static("/", html_root, name="webapp") + app.static("/", os.path.join(html_root, "index.html"), name="webapp_index") + app.static("/dl", os.path.join(html_root, "dl.html"), name="dl") else: print("Warning: no html_root has been specified, sanic won't serve the webapp", file=sys.stderr) diff --git a/api/setup.py b/api/setup.py index a5b47ed..d4a6f11 100644 --- a/api/setup.py +++ b/api/setup.py @@ -16,11 +16,12 @@ packages=['secsend_api'], install_requires=[ 'jsonschema==4.15.*', - 'sanic==21.12.*', + 'sanic==23.12.1' ], extras_require={ 'dev': [ - 'sanic-testing==0.8.3', + 'sanic-testing==23.12.0', + 'pytest_asyncio==0.23.6' ], } ) diff --git a/api/tests/test_api.py b/api/tests/test_api.py index 94b32cd..4cf8afc 100644 --- a/api/tests/test_api.py +++ b/api/tests/test_api.py @@ -1,185 +1,172 @@ -import unittest +import pytest import tempfile import base64 import time +import os +from unittest.mock import patch from secsend_api import declare_app -from secsend_api.backend import FileID, RootID, BaseID, BackendErrorIDUnavailable -from secsend_api.backend_files import BackendFiles +from secsend_api.backend import RootID, BaseID, BackendErrorIDUnavailable from secsend_api.metadata import EncryptedFileMetadata -from sanic_testing.reusable import ReusableClient -from sanic_testing.testing import SanicTestClient -from sanic_testing import TestManager METADATA = EncryptedFileMetadata(name=b"ENCRYPTED_NAME", mime_type=b"ENCRYPTED_MIME_TYPE", iv=b"\x00"*12, chunk_size=b"ENCRYPTED_CHUNK_SIZE", key_sign=b"") -class TestBackendFiles(unittest.TestCase): - def setUp(self): - self.root = tempfile.TemporaryDirectory(prefix="secsend_api") - self.app = declare_app(enable_cors=False, backend_files_root=self.root.name, html_root=None, timeout_s_valid=[0,1]) - - def tearDown(self): - self.root.cleanup() - - def test_api_config(self): - _, response = self.app.test_client.get("/v1/config") - self.assertEqual(response.status, 200) - self.assertEqual(response.json['filesize_limit'], 0) - - def test_api_invalid_metadata(self): - _, response = self.app.test_client.post("/v1/upload/new", json={}) - self.assertEqual(response.status, 400) - - def test_api_unk_id(self): - rid = RootID.generate() - id_ = str(rid.file_id()) - for url in ("/metadata", "/download"): - _, response = self.app.test_client.get("%s/%s" % (url,id_)) - self.assertEqual(response.status, 404) - - _, response = self.app.test_client.post("/v1/upload/push/%s" % str(rid), json=METADATA.jsonable()) - self.assertEqual(response.status, 404) - - def test_api_invalid_id(self): - for id_ in ("0", base64.urlsafe_b64encode(b"AA")): - for url in ("metadata", "download"): - _, response = self.app.test_client.get("/v1/%s/%s" % (url,id_)) - self.assertEqual(response.status, 400) - - _, response = self.app.test_client.post("/v1/upload/push/%s" % id_, json=METADATA.jsonable()) - self.assertEqual(response.status, 400) - - def test_api_upload_download_delete(self): - client = self.app.test_client - _, response = client.post("/v1/upload/new", json=METADATA.jsonable()) - self.assertEqual(response.status, 200) - rid = response.json['root_id'] - rid = RootID.from_str(rid) - rid_s = str(rid) - - data = b"hello world!" - - _, response = client.post("/v1/upload/push/%s" % rid_s, data=data[:4]) - self.assertEqual(response.status, 200) - _, response = client.post("/v1/upload/push/%s" % rid_s, data=data[4:]) - self.assertEqual(response.status, 200) - _, response = client.post("/v1/upload/finish/%s" % rid_s) - self.assertEqual(response.status, 200) - - id_ = str(rid.file_id()) - _, response = client.get("/v1/metadata/%s" % id_) - self.assertEqual(response.status, 200) - d = response.json - self.assertEqual(d['size'], len(data)) - ret_metadata = d['metadata'] - self.assertTrue(ret_metadata['complete']) - del ret_metadata['complete'] - ref = METADATA.jsonable() - del ref['complete'] - self.assertEqual(ret_metadata, ref) - - _, response = client.get("/v1/download/%s" % id_) - self.assertEqual(response.status, 200) - self.assertEqual(response.read(), data) - - _, response = client.post("/v1/delete/%s" % id_) - self.assertEqual(response.status, 400) - - _, response = client.post("/v1/delete/%s" % rid_s) - self.assertEqual(response.status, 200) - - _, response = client.get("/v1/download/%s" % id_) - self.assertEqual(response.status, 404) - - def test_api_upload_timeout(self): - client = self.app.test_client - metadata = METADATA.jsonable() - metadata['timeout_s'] = 1 - _, response = client.post("/v1/upload/new", json=metadata) - self.assertEqual(response.status, 200) - rid = response.json['root_id'] - rid = RootID.from_str(rid) - rid_s = str(rid) - - data = b"hello world!" - - _, response = client.post("/v1/upload/push/%s" % rid_s, data=data) - self.assertEqual(response.status, 200) - time.sleep(2) - _, response = client.post("/v1/upload/finish/%s" % rid_s) - self.assertEqual(response.status, 200) - - id_ = str(rid.file_id()) - _, response = client.get("/v1/download/%s" % id_) - self.assertEqual(response.status, 200) - - time.sleep(2) - _, response = client.get("/v1/download/%s" % id_) - self.assertEqual(response.status, 404) - - def test_api_invalid_timeout(self): - client = self.app.test_client - metadata = METADATA.jsonable() - metadata['timeout_s'] = 4 - _, response = client.post("/v1/upload/new", json=metadata) - self.assertEqual(response.status, 400) - -class TestBackendFilesFilesizeLimit(unittest.TestCase): - def setUp(self): - self.root = tempfile.TemporaryDirectory(prefix="secsend_api") - self.filesize_limit = 1024 - self.app = declare_app(enable_cors=False, backend_files_root=self.root.name, html_root=None, timeout_s_valid=[0,1], filesize_limit=self.filesize_limit) - - def test_api_config(self): - _, response = self.app.test_client.get("/v1/config") - self.assertEqual(response.status, 200) - self.assertEqual(response.json['filesize_limit'], self.filesize_limit) - - def test_api_upload_okay(self): - client = self.app.test_client - _, response = client.post("/v1/upload/new", json=METADATA.jsonable()) - self.assertEqual(response.status, 200) - rid = response.json['root_id'] - rid = RootID.from_str(rid) - rid_s = str(rid) - - data = b"hello world!" - - _, response = client.post("/v1/upload/push/%s" % rid_s, data=data) - self.assertEqual(response.status, 200) - _, response = client.post("/v1/upload/finish/%s" % rid_s) - self.assertEqual(response.status, 200) - - def test_api_upload_toobig(self): - client = self.app.test_client - _, response = client.post("/v1/upload/new", json=METADATA.jsonable()) - self.assertEqual(response.status, 200) - rid = response.json['root_id'] - rid = RootID.from_str(rid) - rid_s = str(rid) - - data = b"A"*self.filesize_limit - - _, response = client.post("/v1/upload/push/%s" % rid_s, data=data[:4]) - self.assertEqual(response.status, 200) - _, response = client.post("/v1/upload/push/%s" % rid_s, data=data[4:]) - self.assertEqual(response.status, 400) - _, response = client.post("/v1/upload/finish/%s" % rid_s) - self.assertEqual(response.status, 404) - -class TestBackendFilesTinyIDs(unittest.TestCase): - def setUp(self): - self.root = tempfile.TemporaryDirectory(prefix="secsend_api") - self.app = declare_app(enable_cors=False, backend_files_root=self.root.name, html_root=None, timeout_s_valid=[0,1]) - self.org_ID_LEN = BaseID.ID_LEN - BaseID.ID_LEN = 1 - - def tearDown(self): - self.root.cleanup() - BaseID.ID_LEN = self.org_ID_LEN - - def test_api_lots_id(self): - client = self.app.test_client +@pytest.fixture +def app_backend_files(): + with tempfile.TemporaryDirectory(prefix="secsend_api") as root: + app = declare_app(enable_cors=False, backend_files_root=root, html_root=None, timeout_s_valid=[0,1]) + yield app + +def test_api_config(app_backend_files): + _, response = app_backend_files.test_client.get("/v1/config") + assert(response.status == 200) + assert(response.json['filesize_limit'] == 0) + +def test_api_invalid_metadata(app_backend_files): + _, response = app_backend_files.test_client.post("/v1/upload/new", json={}) + assert(response.status == 400) + +def test_api_unk_id(app_backend_files): + rid = RootID.generate() + id_ = str(rid.file_id()) + for url in ("/metadata", "/download"): + _, response = app_backend_files.test_client.get("%s/%s" % (url,id_)) + assert(response.status == 404) + + _, response = app_backend_files.test_client.post("/v1/upload/push/%s" % str(rid), json=METADATA.jsonable()) + assert(response.status == 404) + +def test_api_invalid_id(app_backend_files): + for id_ in ("0", base64.urlsafe_b64encode(b"AA")): + for url in ("metadata", "download"): + _, response = app_backend_files.test_client.get("/v1/%s/%s" % (url,id_)) + assert(response.status == 400) + + _, response = app_backend_files.test_client.post("/v1/upload/push/%s" % id_, json=METADATA.jsonable()) + assert(response.status == 400) + +def test_api_upload_download_delete(app_backend_files): + client = app_backend_files.test_client + _, response = client.post("/v1/upload/new", json=METADATA.jsonable()) + assert(response.status == 200) + rid = response.json['root_id'] + rid = RootID.from_str(rid) + rid_s = str(rid) + + data = b"hello world!" + + _, response = client.post("/v1/upload/push/%s" % rid_s, data=data[:4]) + assert(response.status == 200) + _, response = client.post("/v1/upload/push/%s" % rid_s, data=data[4:]) + assert(response.status == 200) + _, response = client.post("/v1/upload/finish/%s" % rid_s) + assert(response.status == 200) + + id_ = str(rid.file_id()) + _, response = client.get("/v1/metadata/%s" % id_) + assert(response.status == 200) + d = response.json + assert(d['size'] == len(data)) + ret_metadata = d['metadata'] + assert(ret_metadata['complete']) + del ret_metadata['complete'] + ref = METADATA.jsonable() + del ref['complete'] + assert(ret_metadata == ref) + + _, response = client.get("/v1/download/%s" % id_) + assert(response.status == 200) + assert(response.read() == data) + + _, response = client.post("/v1/delete/%s" % id_) + assert(response.status == 400) + + _, response = client.post("/v1/delete/%s" % rid_s) + assert(response.status == 200) + + _, response = client.get("/v1/download/%s" % id_) + assert(response.status == 404) + +def test_api_upload_timeout(app_backend_files): + client = app_backend_files.test_client + metadata = METADATA.jsonable() + metadata['timeout_s'] = 1 + _, response = client.post("/v1/upload/new", json=metadata) + assert(response.status == 200) + rid = response.json['root_id'] + rid = RootID.from_str(rid) + rid_s = str(rid) + + data = b"hello world!" + + _, response = client.post("/v1/upload/push/%s" % rid_s, data=data) + assert(response.status == 200) + time.sleep(2) + _, response = client.post("/v1/upload/finish/%s" % rid_s) + assert(response.status == 200) + + id_ = str(rid.file_id()) + _, response = client.get("/v1/download/%s" % id_) + assert(response.status == 200) + + time.sleep(2) + _, response = client.get("/v1/download/%s" % id_) + assert(response.status == 404) + +def test_api_invalid_timeout(app_backend_files): + client = app_backend_files.test_client + metadata = METADATA.jsonable() + metadata['timeout_s'] = 4 + _, response = client.post("/v1/upload/new", json=metadata) + assert(response.status == 400) + +FILESIZE_LIMIT = 1024 +@pytest.fixture +def app_backend_files_sizelimit(): + with tempfile.TemporaryDirectory(prefix="secsend_api") as root: + app = declare_app(enable_cors=False, backend_files_root=root, html_root=None, timeout_s_valid=[0,1], filesize_limit=FILESIZE_LIMIT) + yield app + +def test_api_config(app_backend_files_sizelimit): + _, response = app_backend_files_sizelimit.test_client.get("/v1/config") + assert(response.status == 200) + assert(response.json['filesize_limit'] == FILESIZE_LIMIT) + +def test_api_upload_okay(app_backend_files_sizelimit): + client = app_backend_files_sizelimit.test_client + _, response = client.post("/v1/upload/new", json=METADATA.jsonable()) + assert(response.status == 200) + rid = response.json['root_id'] + rid = RootID.from_str(rid) + rid_s = str(rid) + + data = b"hello world!" + + _, response = client.post("/v1/upload/push/%s" % rid_s, data=data) + assert(response.status == 200) + _, response = client.post("/v1/upload/finish/%s" % rid_s) + assert(response.status == 200) + +def test_api_upload_toobig(app_backend_files_sizelimit): + client = app_backend_files_sizelimit.test_client + _, response = client.post("/v1/upload/new", json=METADATA.jsonable()) + assert(response.status == 200) + rid = response.json['root_id'] + rid = RootID.from_str(rid) + rid_s = str(rid) + + data = b"A"*FILESIZE_LIMIT + + _, response = client.post("/v1/upload/push/%s" % rid_s, data=data[:4]) + assert(response.status == 200) + _, response = client.post("/v1/upload/push/%s" % rid_s, data=data[4:]) + assert(response.status == 400) + _, response = client.post("/v1/upload/finish/%s" % rid_s) + assert(response.status == 404) + +def test_api_lots_id(app_backend_files): + client = app_backend_files.test_client + with patch.object(BaseID, "ID_LEN", new=1): # Birthday paradox: after 128 insertions, we'll have a 50% chance to # hit an already existing file. At some point, we should catch a 500 # error. @@ -188,7 +175,24 @@ def test_api_lots_id(self): if response.status == 200: continue if response.status == 500: - self.assertEqual(response.json['message'], str(BackendErrorIDUnavailable())) + assert(response.json['message'] == str(BackendErrorIDUnavailable())) return # We should had come to a point where we were not able to catch an ID - self.assertTrue(False) + assert(False) + +@pytest.fixture +def app_backend_files_html(): + with tempfile.TemporaryDirectory(prefix="secsend_api") as root: + with tempfile.TemporaryDirectory(prefix="secsend_html_root") as html_root: + with open(os.path.join(html_root, "index.html"),"w") as f: + f.write("hello") + with open(os.path.join(html_root, "style.css"),"w") as f: + f.write("hello css") + app = declare_app(enable_cors=False, backend_files_root=root, html_root=html_root, timeout_s_valid=[0,1]) + yield app + +def test_html_index(app_backend_files_html): + _, response = app_backend_files_html.test_client.get("/index.html") + assert(response.text == "hello") + _, response = app_backend_files_html.test_client.get("/style.css") + assert(response.text == "hello css") diff --git a/api/tests/test_backend_files.py b/api/tests/test_backend_files.py index f9a7e7b..ba1b03b 100644 --- a/api/tests/test_backend_files.py +++ b/api/tests/test_backend_files.py @@ -1,48 +1,48 @@ -import unittest +import pytest import tempfile -import os from secsend_api.backend import RootID, FileID, BackendErrorIDExists, BackendErrorIDUnknown, BackendErrorFileLocked from secsend_api.metadata import EncryptedFileMetadata from secsend_api.backend_files import BackendFiles +pytest_plugins = ('pytest_asyncio',) + METADATA = EncryptedFileMetadata(name=b"ENCRYPTED_NAME", mime_type=b"ENCRYPTED_MIME_TYPE", iv=b"\x00"*16, chunk_size=b"ENCRYPTED_CHUNK_SIZE", key_sign=b"") -class TestBackendFiles(unittest.IsolatedAsyncioTestCase): - def setUp(self): - self.root = tempfile.TemporaryDirectory(prefix="secsend_api") - self.backend = BackendFiles(self.root.name) - - def tearDown(self): - self.root.cleanup() - - async def test_create_read(self): - fid = RootID.generate().file_id() - data = b"coucou" - async with self.backend.create(fid, METADATA).stream_append() as s: - await s.write(data) - - f = self.backend.open(fid) - self.assertEqual(f.metadata, METADATA) - async with f.stream_read() as s: - self.assertEqual(await s.read(), data) - - async def test_lock(self): - fid = RootID.generate().file_id() - f = self.backend.create(fid, METADATA) - with self.assertRaises(BackendErrorFileLocked): - async with f.lock_write(): - async with f.lock_write(): pass - - def test_create_exists(self): - fid = RootID.generate().file_id() - self.backend.create(fid, METADATA) - with self.assertRaises(BackendErrorIDExists): - self.backend.create(fid, METADATA) - - def test_read_unk(self): - fid = RootID.generate().file_id() - with self.assertRaises(BackendErrorIDUnknown): - # Force calling the metadata, otherwise it is lazy loaded and no - # exception happens - self.backend.open(fid).metadata +@pytest.fixture +def backend(): + with tempfile.TemporaryDirectory(prefix="secsend_api") as root: + yield BackendFiles(root) + +@pytest.mark.asyncio +async def test_create_read(backend): + fid = RootID.generate().file_id() + data = b"coucou" + async with backend.create(fid, METADATA).stream_append() as s: + await s.write(data) + + f = backend.open(fid) + assert(f.metadata == METADATA) + async with f.stream_read() as s: + assert(await s.read() == data) + +@pytest.mark.asyncio +async def test_lock(backend): + fid = RootID.generate().file_id() + f = backend.create(fid, METADATA) + with pytest.raises(BackendErrorFileLocked): + async with f.lock_write(): + async with f.lock_write(): pass + +def test_create_exists(backend): + fid = RootID.generate().file_id() + backend.create(fid, METADATA) + with pytest.raises(BackendErrorIDExists): + backend.create(fid, METADATA) + +def test_read_unk(backend): + fid = RootID.generate().file_id() + with pytest.raises(BackendErrorIDUnknown): + # Force calling the metadata, otherwise it is lazy loaded and no + # exception happens + backend.open(fid).metadata