diff --git a/CONTRIBUTING.md b/CONTRIBUTING.md index 60314618..1badf198 100644 --- a/CONTRIBUTING.md +++ b/CONTRIBUTING.md @@ -13,12 +13,17 @@ The back-end core feature is to interact with the metadata tables. For the servi - Users: stores the hashed credentials and access level for users. - Cameras: stores the camera metadata. +- Organizations: scope the access to the API. #### Core worklow tables - Detection: association of a picture and a camera. -![UML diagram](https://github.com/user-attachments/assets/d0160a58-b494-4b81-bef0-b1a9f483be3e) +#### Client-related tables + +- Webhook: stores the webhook URLs. + +_The UML is versioned at [`scripts/dbdiagram.txt`](https://github.com/pyronear/pyro-api/blob/main/scripts/dbdiagram.txt) and the UML diagram is available on [DBDiagram](https://dbdiagram.io/d/Pyronear-UML-665a15d0b65d933879357b58)._ ### What is the full detection workflow through the API diff --git a/client/pyroclient/client.py b/client/pyroclient/client.py index 626835ed..1312a46d 100644 --- a/client/pyroclient/client.py +++ b/client/pyroclient/client.py @@ -23,6 +23,7 @@ # CAMERAS ################# "cameras-heartbeat": "/cameras/heartbeat", + "cameras-image": "/cameras/image", "cameras-fetch": "/cameras", ################# # DETECTIONS @@ -103,6 +104,22 @@ def headers(self) -> Dict[str, str]: return {"Authorization": f"Bearer {self.token}"} # CAMERAS + def fetch_cameras(self) -> Response: + """List the cameras accessible to the authenticated user + + >>> from pyroclient import client + >>> api_client = Client("MY_USER_TOKEN") + >>> response = api_client.fetch_cameras() + + Returns: + HTTP response + """ + return requests.get( + self.routes["cameras-fetch"], + headers=self.headers, + timeout=self.timeout, + ) + def heartbeat(self) -> Response: """Update the last ping of the camera @@ -115,6 +132,24 @@ def heartbeat(self) -> Response: """ return requests.patch(self.routes["cameras-heartbeat"], headers=self.headers, timeout=self.timeout) + def update_last_image(self, media: bytes) -> Response: + """Update the last image of the camera + + >>> from pyroclient import Client + >>> api_client = Client("MY_CAM_TOKEN") + >>> with open("path/to/my/file.ext", "rb") as f: data = f.read() + >>> response = api_client.update_last_image(data) + + Returns: + HTTP response containing the update device info + """ + return requests.patch( + self.routes["cameras-image"], + headers=self.headers, + files={"file": ("logo.png", media, "image/png")}, + timeout=self.timeout, + ) + # DETECTIONS def create_detection( self, @@ -171,22 +206,6 @@ def label_detection(self, detection_id: int, is_wildfire: bool) -> Response: timeout=self.timeout, ) - def fetch_cameras(self) -> Response: - """List the cameras accessible to the authenticated user - - >>> from pyroclient import client - >>> api_client = Client("MY_USER_TOKEN") - >>> response = api_client.fetch_cameras() - - Returns: - HTTP response - """ - return requests.get( - self.routes["cameras-fetch"], - headers=self.headers, - timeout=self.timeout, - ) - def get_detection_url(self, detection_id: int) -> Response: """Retrieve the URL of the media linked to a detection diff --git a/client/tests/test_client.py b/client/tests/test_client.py index a1d94342..2be897e9 100644 --- a/client/tests/test_client.py +++ b/client/tests/test_client.py @@ -26,7 +26,13 @@ def test_client_constructor(token, host, timeout, expected_error): @pytest.fixture(scope="session") def test_cam_workflow(cam_token, mock_img): cam_client = Client(cam_token, "http://localhost:5050", timeout=10) - assert cam_client.heartbeat().status_code == 200 + response = cam_client.heartbeat() + assert response.status_code == 200 + # Check that last_image gets changed + assert response.json()["last_image"] is None + response = cam_client.update_last_image(mock_img) + assert response.status_code == 200, response.__dict__ + assert isinstance(response.json()["last_image"], str) # Check that adding bboxes works with pytest.raises(ValueError, match="bboxes must be a non-empty list of tuples"): cam_client.create_detection(mock_img, 123.2, None) diff --git a/pyproject.toml b/pyproject.toml index e07a2832..03105ced 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -158,5 +158,7 @@ module = [ "botocore.*", "databases", "posthog", + "prometheus_fastapi_instrumentator", + "pydantic_settings", ] ignore_missing_imports = true diff --git a/scripts/dbdiagram.txt b/scripts/dbdiagram.txt index 8a6bd018..95dd3330 100644 --- a/scripts/dbdiagram.txt +++ b/scripts/dbdiagram.txt @@ -8,8 +8,8 @@ Table "User" as U { "id" int [not null] "organization_id" int [ref: > O.id, not null] "role" userrole [not null] - "login" str [not null] - "hashed_password" str [not null] + "login" text [not null] + "hashed_password" text [not null] "created_at" timestamp [not null] Indexes { (id, login) [pk] @@ -19,7 +19,7 @@ Table "User" as U { Table "Camera" as C { "id" int [not null] "organization_id" int [ref: > O.id, not null] - "name" str [not null] + "name" text [not null] "angle_of_view" float [not null] "elevation" float [not null] "lat" float [not null] @@ -27,6 +27,7 @@ Table "Camera" as C { "is_trustable" bool [not null] "created_at" timestamp [not null] "last_active_at" timestamp + "last_image" text Indexes { (id) [pk] } @@ -36,8 +37,8 @@ Table "Detection" as D { "id" int [not null] "camera_id" int [ref: > C.id, not null] "azimuth" float [not null] - "bucket_key" str [not null] - "bboxes" str [not null] + "bucket_key" text [not null] + "bboxes" text [not null] "is_wildfire" bool "created_at" timestamp [not null] "updated_at" timestamp [not null] @@ -48,7 +49,7 @@ Table "Detection" as D { Table "Organization" as O { "id" int [not null] - "name" str [not null] + "name" text [not null] Indexes { (id) [pk] } @@ -57,7 +58,7 @@ Table "Organization" as O { Table "Webhook" as W { "id" int [not null] - "url" str [not null] + "url" text [not null] Indexes { (id) [pk] } diff --git a/src/app/api/api_v1/endpoints/cameras.py b/src/app/api/api_v1/endpoints/cameras.py index c31f8ce2..4d0d91d4 100644 --- a/src/app/api/api_v1/endpoints/cameras.py +++ b/src/app/api/api_v1/endpoints/cameras.py @@ -6,15 +6,16 @@ from datetime import datetime from typing import List, cast -from fastapi import APIRouter, Depends, HTTPException, Path, Security, status +from fastapi import APIRouter, Depends, File, HTTPException, Path, Security, UploadFile, status from app.api.dependencies import get_camera_crud, get_jwt from app.core.config import settings from app.core.security import create_access_token from app.crud import CameraCRUD from app.models import Camera, Role, UserRole -from app.schemas.cameras import CameraCreate, LastActive +from app.schemas.cameras import CameraCreate, LastActive, LastImage from app.schemas.login import Token, TokenPayload +from app.services.storage import s3_service, upload_file from app.services.telemetry import telemetry_client router = APIRouter() @@ -62,10 +63,26 @@ async def heartbeat( cameras: CameraCRUD = Depends(get_camera_crud), token_payload: TokenPayload = Security(get_jwt, scopes=[Role.CAMERA]), ) -> Camera: - # telemetry_client.capture(f"camera|{token_payload.sub}", event="cameras-heartbeat", properties={"camera_id": camera_id}) + # telemetry_client.capture(f"camera|{token_payload.sub}", event="cameras-heartbeat") return await cameras.update(token_payload.sub, LastActive(last_active_at=datetime.utcnow())) +@router.patch("/image", status_code=status.HTTP_200_OK, summary="Update last image of a camera") +async def update_image( + file: UploadFile = File(..., alias="file"), + cameras: CameraCRUD = Depends(get_camera_crud), + token_payload: TokenPayload = Security(get_jwt, scopes=[Role.CAMERA]), +) -> Camera: + # telemetry_client.capture(f"camera|{token_payload.sub}", event="cameras-image") + bucket_key = await upload_file(file, token_payload.organization_id, token_payload.sub) + # If the upload succeeds, delete the previous image + cam = cast(Camera, await cameras.get(token_payload.sub, strict=True)) + if isinstance(cam.last_image, str): + s3_service.get_bucket(s3_service.resolve_bucket_name(token_payload.organization_id)).delete_file(cam.last_image) + # Update the DB entry + return await cameras.update(token_payload.sub, LastImage(last_image=bucket_key, last_active_at=datetime.utcnow())) + + @router.post("/{camera_id}/token", status_code=status.HTTP_200_OK, summary="Request an access token for the camera") async def create_camera_token( camera_id: int = Path(..., gt=0), diff --git a/src/app/api/api_v1/endpoints/detections.py b/src/app/api/api_v1/endpoints/detections.py index 27f33947..bee700ac 100644 --- a/src/app/api/api_v1/endpoints/detections.py +++ b/src/app/api/api_v1/endpoints/detections.py @@ -3,13 +3,9 @@ # This program is licensed under the Apache License 2.0. # See LICENSE or go to for full license details. -import hashlib -import logging from datetime import datetime -from mimetypes import guess_extension from typing import List, cast -import magic from fastapi import ( APIRouter, BackgroundTasks, @@ -37,7 +33,7 @@ DetectionWithUrl, ) from app.schemas.login import TokenPayload -from app.services.storage import s3_service +from app.services.storage import s3_service, upload_file from app.services.telemetry import telemetry_client router = APIRouter() @@ -69,37 +65,7 @@ async def create_detection( ) # Upload media - # Concatenate the first 8 chars (to avoid system interactions issues) of SHA256 hash with file extension - sha_hash = hashlib.sha256(file.file.read()).hexdigest() - await file.seek(0) - # Use MD5 to verify upload - md5_hash = hashlib.md5(file.file.read()).hexdigest() # noqa S324 - await file.seek(0) - # guess_extension will return none if this fails - extension = guess_extension(magic.from_buffer(file.file.read(), mime=True)) or "" - # Concatenate timestamp & hash - bucket_key = f"{token_payload.sub}-{datetime.utcnow().strftime('%Y%m%d%H%M%S')}-{sha_hash[:8]}{extension}" - # Reset byte position of the file (cf. https://fastapi.tiangolo.com/tutorial/request-files/#uploadfile) - await file.seek(0) - bucket_name = s3_service.resolve_bucket_name(token_payload.organization_id) - bucket = s3_service.get_bucket(bucket_name) - # Upload the file - if not bucket.upload_file(bucket_key, file.file): # type: ignore[arg-type] - raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail="Failed upload") - logging.info(f"File uploaded to bucket {bucket_name} with key {bucket_key}.") - - # Data integrity check - file_meta = bucket.get_file_metadata(bucket_key) - # Corrupted file - if md5_hash != file_meta["ETag"].replace('"', ""): - # Delete the corrupted upload - bucket.delete_file(bucket_key) - # Raise the exception - raise HTTPException( - status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, - detail="Data was corrupted during upload", - ) - # Format the string + bucket_key = await upload_file(file, token_payload.organization_id, token_payload.sub) det = await detections.create( DetectionCreate(camera_id=token_payload.sub, bucket_key=bucket_key, azimuth=azimuth, bboxes=bboxes) ) diff --git a/src/app/models.py b/src/app/models.py index fca0f157..6dee673a 100644 --- a/src/app/models.py +++ b/src/app/models.py @@ -28,8 +28,9 @@ class Role(str, Enum): class User(SQLModel, table=True): + __tablename__ = "users" id: int = Field(None, primary_key=True) - organization_id: int = Field(..., foreign_key="organization.id", nullable=False) + organization_id: int = Field(..., foreign_key="organizations.id", nullable=False) role: UserRole = Field(UserRole.USER, nullable=False) # Allow sign-up/in via login + password login: str = Field(..., index=True, unique=True, min_length=2, max_length=50, nullable=False) @@ -38,8 +39,9 @@ class User(SQLModel, table=True): class Camera(SQLModel, table=True): + __tablename__ = "cameras" id: int = Field(None, primary_key=True) - organization_id: int = Field(..., foreign_key="organization.id", nullable=False) + organization_id: int = Field(..., foreign_key="organizations.id", nullable=False) name: str = Field(..., min_length=5, max_length=100, nullable=False, unique=True) angle_of_view: float = Field(..., gt=0, le=360, nullable=False) elevation: float = Field(..., gt=0, lt=10000, nullable=False) @@ -47,12 +49,14 @@ class Camera(SQLModel, table=True): lon: float = Field(..., gt=-180, lt=180) is_trustable: bool = True last_active_at: Union[datetime, None] = None + last_image: Union[str, None] = None created_at: datetime = Field(default_factory=datetime.utcnow, nullable=False) class Detection(SQLModel, table=True): + __tablename__ = "detections" id: int = Field(None, primary_key=True) - camera_id: int = Field(..., foreign_key="camera.id", nullable=False) + camera_id: int = Field(..., foreign_key="cameras.id", nullable=False) azimuth: float = Field(..., gt=0, lt=360) bucket_key: str bboxes: str = Field(..., min_length=2, max_length=settings.MAX_BBOX_STR_LENGTH, nullable=False) @@ -62,10 +66,12 @@ class Detection(SQLModel, table=True): class Organization(SQLModel, table=True): + __tablename__ = "organizations" id: int = Field(None, primary_key=True) name: str = Field(..., min_length=5, max_length=100, nullable=False, unique=True) class Webhook(SQLModel, table=True): + __tablename__ = "webhooks" id: int = Field(None, primary_key=True) url: str = Field(..., nullable=False, unique=True) diff --git a/src/app/schemas/cameras.py b/src/app/schemas/cameras.py index 910f2779..2afdb6aa 100644 --- a/src/app/schemas/cameras.py +++ b/src/app/schemas/cameras.py @@ -17,6 +17,10 @@ class LastActive(BaseModel): last_active_at: datetime = Field(default_factory=datetime.utcnow) +class LastImage(LastActive): + last_image: str + + class CameraCreate(BaseModel): organization_id: int = Field(..., gt=0) name: str = Field( diff --git a/src/app/services/storage.py b/src/app/services/storage.py index 5841d258..efcddeba 100644 --- a/src/app/services/storage.py +++ b/src/app/services/storage.py @@ -3,16 +3,20 @@ # This program is licensed under the Apache License 2.0. # See LICENSE or go to for full license details. +import hashlib import logging +from datetime import datetime +from mimetypes import guess_extension from typing import Any, Dict, Union import boto3 +import magic from botocore.exceptions import ClientError, EndpointConnectionError, NoCredentialsError, PartialCredentialsError -from fastapi import HTTPException, status +from fastapi import HTTPException, UploadFile, status from app.core.config import settings -__all__ = ["s3_service"] +__all__ = ["s3_service", "upload_file"] logger = logging.getLogger("uvicorn.warning") @@ -146,6 +150,41 @@ def resolve_bucket_name(organization_id: int) -> str: return f"alert-api-{organization_id!s}" +async def upload_file(file: UploadFile, organization_id: int, camera_id: int) -> str: + """Upload a file to S3 storage and return the public URL""" + # Concatenate the first 8 chars (to avoid system interactions issues) of SHA256 hash with file extension + sha_hash = hashlib.sha256(file.file.read()).hexdigest() + await file.seek(0) + # Use MD5 to verify upload + md5_hash = hashlib.md5(file.file.read()).hexdigest() # noqa S324 + await file.seek(0) + # guess_extension will return none if this fails + extension = guess_extension(magic.from_buffer(file.file.read(), mime=True)) or "" + # Concatenate timestamp & hash + bucket_key = f"{camera_id}-{datetime.utcnow().strftime('%Y%m%d%H%M%S')}-{sha_hash[:8]}{extension}" + # Reset byte position of the file (cf. https://fastapi.tiangolo.com/tutorial/request-files/#uploadfile) + await file.seek(0) + bucket_name = s3_service.resolve_bucket_name(organization_id) + bucket = s3_service.get_bucket(bucket_name) + # Upload the file + if not bucket.upload_file(bucket_key, file.file): # type: ignore[arg-type] + raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail="Failed upload") + logging.info(f"File uploaded to bucket {bucket_name} with key {bucket_key}.") + + # Data integrity check + file_meta = bucket.get_file_metadata(bucket_key) + # Corrupted file + if md5_hash != file_meta["ETag"].replace('"', ""): + # Delete the corrupted upload + bucket.delete_file(bucket_key) + # Raise the exception + raise HTTPException( + status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, + detail="Data was corrupted during upload", + ) + return bucket_key + + s3_service = S3Service( settings.S3_REGION, settings.S3_ENDPOINT_URL, settings.S3_ACCESS_KEY, settings.S3_SECRET_KEY, settings.S3_PROXY_URL ) diff --git a/src/tests/conftest.py b/src/tests/conftest.py index 9298a23f..0f885f43 100644 --- a/src/tests/conftest.py +++ b/src/tests/conftest.py @@ -71,6 +71,7 @@ "lon": -45.2, "is_trustable": True, "last_active_at": datetime.strptime("2023-11-07T15:07:19.226673", dt_format), + "last_image": None, "created_at": datetime.strptime("2023-11-07T15:07:19.226673", dt_format), }, { @@ -83,6 +84,7 @@ "lon": -45.2, "is_trustable": False, "last_active_at": None, + "last_image": None, "created_at": datetime.strptime("2023-11-07T15:07:19.226673", dt_format), }, ] @@ -185,7 +187,9 @@ async def organization_session(async_session: AsyncSession): async_session.add(Organization(**entry)) await async_session.commit() await async_session.exec( - text(f"ALTER SEQUENCE organization_id_seq RESTART WITH {max(entry['id'] for entry in ORGANIZATION_TABLE) + 1}") + text( + f"ALTER SEQUENCE {Organization.__tablename__}_id_seq RESTART WITH {max(entry['id'] for entry in ORGANIZATION_TABLE) + 1}" + ) ) await async_session.commit() # Create buckets @@ -207,7 +211,9 @@ async def webhook_session(async_session: AsyncSession): async_session.add(Webhook(**entry)) await async_session.commit() await async_session.exec( - text(f"ALTER SEQUENCE webhook_id_seq RESTART WITH {max(entry['id'] for entry in WEBHOOK_TABLE) + 1}") + text( + f"ALTER SEQUENCE {Webhook.__tablename__}_id_seq RESTART WITH {max(entry['id'] for entry in WEBHOOK_TABLE) + 1}" + ) ) await async_session.commit() yield async_session @@ -222,7 +228,7 @@ async def user_session(organization_session: AsyncSession, monkeypatch): organization_session.add(User(**entry)) await organization_session.commit() await organization_session.exec( - text(f"ALTER SEQUENCE user_id_seq RESTART WITH {max(entry['id'] for entry in USER_TABLE) + 1}") + text(f"ALTER SEQUENCE {User.__tablename__}_id_seq RESTART WITH {max(entry['id'] for entry in USER_TABLE) + 1}") ) await organization_session.commit() yield organization_session @@ -235,7 +241,7 @@ async def camera_session(user_session: AsyncSession, organization_session: Async user_session.add(Camera(**entry)) await user_session.commit() await user_session.exec( - text(f"ALTER SEQUENCE camera_id_seq RESTART WITH {max(entry['id'] for entry in CAM_TABLE) + 1}") + text(f"ALTER SEQUENCE {Camera.__tablename__}_id_seq RESTART WITH {max(entry['id'] for entry in CAM_TABLE) + 1}") ) await user_session.commit() yield user_session @@ -251,7 +257,9 @@ async def detection_session( await user_session.commit() # Update the detection index count await user_session.exec( - text(f"ALTER SEQUENCE detection_id_seq RESTART WITH {max(entry['id'] for entry in DET_TABLE) + 1}") + text( + f"ALTER SEQUENCE {Detection.__tablename__}_id_seq RESTART WITH {max(entry['id'] for entry in DET_TABLE) + 1}" + ) ) await user_session.commit() # Create bucket files diff --git a/src/tests/endpoints/test_cameras.py b/src/tests/endpoints/test_cameras.py index f13aca8c..fe88af3e 100644 --- a/src/tests/endpoints/test_cameras.py +++ b/src/tests/endpoints/test_cameras.py @@ -91,7 +91,9 @@ async def test_create_camera( assert response.json()["detail"] == status_detail if response.status_code // 100 == 2: assert { - k: v for k, v in response.json().items() if k not in {"id", "created_at", "last_active_at", "is_trustable"} + k: v + for k, v in response.json().items() + if k not in {"id", "created_at", "last_active_at", "is_trustable", "last_image"} } == payload @@ -273,7 +275,52 @@ async def test_heartbeat( if isinstance(status_detail, str): assert response.json()["detail"] == status_detail if response.status_code // 100 == 2: - assert response.json()["last_active_at"] != pytest.camera_table[cam_idx]["last_active_at"] + assert isinstance(response.json()["last_active_at"], str) + if pytest.camera_table[cam_idx]["last_active_at"] is not None: + assert response.json()["last_active_at"] > pytest.camera_table[cam_idx]["last_active_at"] assert {k: v for k, v in response.json().items() if k != "last_active_at"} == { k: v for k, v in pytest.camera_table[cam_idx].items() if k != "last_active_at" } + + +@pytest.mark.parametrize( + ("cam_idx", "status_code", "status_detail"), + [ + (None, 401, "Not authenticated"), + (0, 200, None), + (1, 200, None), + ], +) +@pytest.mark.asyncio +async def test_update_image( + async_client: AsyncClient, + camera_session: AsyncSession, + mock_img: bytes, + cam_idx: Union[int, None], + status_code: int, + status_detail: Union[str, None], +): + auth = None + if isinstance(cam_idx, int): + auth = pytest.get_token( + pytest.camera_table[cam_idx]["id"], + ["camera"], + pytest.camera_table[cam_idx]["organization_id"], + ) + + response = await async_client.patch( + "/cameras/image", files={"file": ("logo.png", mock_img, "image/png")}, headers=auth + ) + assert response.status_code == status_code, print(response.__dict__) + if isinstance(status_detail, str): + assert response.json()["detail"] == status_detail + if response.status_code // 100 == 2: + assert isinstance(response.json()["last_active_at"], str) + if pytest.camera_table[cam_idx]["last_active_at"] is not None: + assert response.json()["last_active_at"] > pytest.camera_table[cam_idx]["last_active_at"] + assert isinstance(response.json()["last_image"], str) + if pytest.camera_table[cam_idx]["last_image"] is not None: + assert response.json()["last_image"] != pytest.camera_table[cam_idx]["last_image"] + assert {k: v for k, v in response.json().items() if k not in {"last_active_at", "last_image"}} == { + k: v for k, v in pytest.camera_table[cam_idx].items() if k not in {"last_active_at", "last_image"} + }