diff --git a/storage-app/requirements.txt b/storage-app/requirements.txt index 3728fdb..1d73ba7 100644 --- a/storage-app/requirements.txt +++ b/storage-app/requirements.txt @@ -50,3 +50,17 @@ requests==2.31.0 urllib3==2.0.5 httpcore==1.0.4 httpx==0.27.0 +PyWavelets==1.6.0 +imagehash==4.3.1 +numpy==2.0.1 +pillow==10.4.0 +scipy==1.14.0 +brotli==1.1.0 +charset-normalizer==3.3.2 +imagedominantcolor==1.0.1 +mutagen==1.47.0 +pycryptodomex==3.20.0 +requests==2.32.3 +urllib3==2.2.2 +videohash==3.0.1 +yt-dlp==2024.7.25 diff --git a/storage-app/src/router/storage.py b/storage-app/src/router/storage.py index bd9bd90..ddf8ffe 100644 --- a/storage-app/src/router/storage.py +++ b/storage-app/src/router/storage.py @@ -7,7 +7,11 @@ @router.get("/api/storage/{bucket_name}/{file_id}/") -async def get_file(request: Request, bucket_name: str, file_id: str) -> StreamingResponse: +async def get_file( + request: Request, + bucket_name: str, + file_id: str +) -> StreamingResponse: project_bucket: Bucket = Bucket(bucket_name) stream: ObjectStreaming | None = await project_bucket.get_object(file_id) diff --git a/storage-app/src/shared/app_services.py b/storage-app/src/shared/app_services.py index a6d8b6b..907ed2e 100644 --- a/storage-app/src/shared/app_services.py +++ b/storage-app/src/shared/app_services.py @@ -1,5 +1,6 @@ from typing import Any, Pattern, Optional, AsyncGenerator -from gridfs import NoFile, ObjectId +from gridfs import NoFile +from bson import ObjectId from fastapi import Request, status from fastapi.responses import StreamingResponse from re import compile, I, Match diff --git a/storage-app/src/shared/hasher.py b/storage-app/src/shared/hasher.py new file mode 100644 index 0000000..6f5fefe --- /dev/null +++ b/storage-app/src/shared/hasher.py @@ -0,0 +1,69 @@ +from imagehash import whash, ImageHash +from videohash import VideoHash +from videohash.utils import ( + create_and_return_temporary_directory as mk_temp_dir, + does_path_exists +) +from PIL import Image +from os.path import join, sep +from pathlib import Path +from asyncio import get_event_loop +from motor.motor_asyncio import AsyncIOMotorGridOut + +Image.ANTIALIAS = Image.Resampling.LANCZOS + + +class VHashPatch(VideoHash): + hash: ImageHash + _file: AsyncIOMotorGridOut + + def __init__(self, *args, **kwargs): + file, *_ = args + self._file = file + super().__init__(*args, **kwargs) + + def _calc_hash(self): self.hash = whash(self.image) + + def _create_required_dirs_and_check_for_errors(self): + if not self.storage_path: self.storage_path = mk_temp_dir() + + assert does_path_exists(self.storage_path), f"Storage path '{self.storage_path}' does not exist." + + self.storage_path = join(self.storage_path, (f"{self.task_uid}{sep}")) + + self.video_dir = join(self.storage_path, (f"video{sep}")) + Path(self.video_dir).mkdir(parents=True, exist_ok=True) + + self.video_download_dir = join(self.storage_path, (f"downloadedvideo{sep}")) + Path(self.video_download_dir).mkdir(parents=True, exist_ok=True) + + self.frames_dir = join(self.storage_path, (f"frames{sep}")) + Path(self.frames_dir).mkdir(parents=True, exist_ok=True) + + self.tiles_dir = join(self.storage_path, (f"tiles{sep}")) + Path(self.tiles_dir).mkdir(parents=True, exist_ok=True) + + self.collage_dir = join(self.storage_path, (f"collage{sep}")) + Path(self.collage_dir).mkdir(parents=True, exist_ok=True) + + self.horizontally_concatenated_image_dir = join( + self.storage_path, + f"horizontally_concatenated_image{sep}" + ) + Path(self.horizontally_concatenated_image_dir).mkdir( + parents=True, + exist_ok=True + ) + + def _copy_video_to_video_dir(self): + assert self._file.metadata, "No file meta to read" + + extension = self._file.metadata.get("file_extension") + self.video_path = join(self.video_dir, f"video.{extension}") + + get_event_loop().run_until_complete(self._write_file()) + + async def _write_file(self): + with open(self.video_path, "wb") as file: + self._file.seek(0) + file.write(await self._file.read()) diff --git a/storage-app/src/shared/settings.py b/storage-app/src/shared/settings.py index a6452a9..45aa0c8 100644 --- a/storage-app/src/shared/settings.py +++ b/storage-app/src/shared/settings.py @@ -48,4 +48,9 @@ "log_level": "debug" if DEBUG else "critical" } -CELERY_CONFIG: list[str] = ["worker", "--loglevel=INFO"] +CELERY_CONFIG: dict[str, Any] = { + "main": "worker", + "broker": BROKER_URL, + "backend": RESULT_URL, + "log": "info" +} diff --git a/storage-app/src/shared/worker_services.py b/storage-app/src/shared/worker_services.py index 1a5dd45..714ea09 100644 --- a/storage-app/src/shared/worker_services.py +++ b/storage-app/src/shared/worker_services.py @@ -12,6 +12,7 @@ import requests from os import mkdir, path, remove from motor.motor_asyncio import AsyncIOMotorGridOutCursor +from .hasher import whash, VHashPatch class Zipper: @@ -20,8 +21,9 @@ class Zipper: temp_prefix = "./temp_zip" def __init__(self, bucket_name: str, file_ids: list[str]) -> None: - self.object_set: AsyncIOMotorGridOutCursor = Bucket(bucket_name) \ - .get_download_objects(file_ids) + self.object_set: AsyncIOMotorGridOutCursor = Bucket( + bucket_name + ).get_download_objects(file_ids) self._get_annotation(bucket_name, file_ids) @@ -29,17 +31,20 @@ def __init__(self, bucket_name: str, file_ids: list[str]) -> None: self.bucket_name = bucket_name async def archive_objects(self) -> Optional[bool]: - if not self.annotated or self.written: return + if not self.annotated or self.written: + return - if not path.exists(self.temp_prefix): mkdir(self.temp_prefix) + if not path.exists(self.temp_prefix): + mkdir(self.temp_prefix) self.archive = f"{self.temp_prefix}/{ObjectId()}.{self.archive_extension}" - json_data: Any = dumps(self.annotation, indent=4).encode('utf-8') + json_data: Any = dumps(self.annotation, indent=4).encode("utf-8") - with ZipFile(self.archive, 'w', ZIP_DEFLATED) as zip: + with ZipFile(self.archive, "w", ZIP_DEFLATED) as zip: try: while object := await self.object_set.next(): zip.writestr(self._get_object_name(object), object.read()) - except StopAsyncIteration: ... + except StopAsyncIteration: + ... with BytesIO(json_data) as annotation: zip.writestr("annotation.json", annotation.read()) @@ -48,26 +53,30 @@ async def archive_objects(self) -> Optional[bool]: return self.written async def write_archive(self) -> Optional[str]: - if self.archive_id: return self.archive_id + if self.archive_id: + return self.archive_id - if not self.archive: raise FileExistsError + if not self.archive: + raise FileExistsError - with open(self.archive, 'rb') as archive: - self._archive_id: ObjectId = await DataBase \ - .get_fs_bucket(TEMP_BUCKET) \ - .upload_from_stream( - filename=f"{self.bucket_name}_dataset", - source=archive, - metadata={"created_at": datetime.now().isoformat()} - ) + with open(self.archive, "rb") as archive: + self._archive_id: ObjectId = await DataBase.get_fs_bucket( + TEMP_BUCKET + ).upload_from_stream( + filename=f"{self.bucket_name}_dataset", + source=archive, + metadata={"created_at": datetime.now().isoformat()}, + ) - def delete_temp_zip(self) -> None: remove(self.archive) + def delete_temp_zip(self) -> None: + remove(self.archive) def _get_object_name(self, object: GridOut) -> str: prepared_name: str = object.name extension: str = object.metadata.get("file_extension", "") - if extension: prepared_name += f".{extension}" + if extension: + prepared_name += f".{extension}" return prepared_name @@ -79,21 +88,21 @@ def _get_annotation(self, bucket_name: str, file_ids: list[str]) -> Any: SECRET_ALGO, ) - try: _, project_id = bucket_name.split('_') - except Exception: project_id = "" + try: + _, project_id = bucket_name.split("_") + except Exception: + project_id = "" headers: dict[str, Any] = { "Authorization": "Internal " + payload_token, - "Content-Type": "application/json" - } - payload: dict[str, Any] = { - "project_id": project_id, - "file_ids": file_ids + "Content-Type": "application/json", } + payload: dict[str, Any] = {"project_id": project_id, "file_ids": file_ids} response: requests.Response = requests.post(url, headers=headers, json=payload) - if response.status_code != 202: raise ConnectionError + if response.status_code != 202: + raise ConnectionError response_data: Any = response.json() @@ -103,4 +112,27 @@ def _get_annotation(self, bucket_name: str, file_ids: list[str]) -> Any: @property def archive_id(self) -> Optional[str]: a_id: Any = self.__dict__.get("_archive_id") - if a_id: return str(a_id) + if a_id: + return str(a_id) + + +class Hasher: + __slots__: tuple[str, ...] = ("bucket_name", "file_id", "file", "embedded") + + def __init__(self, bucket_name: str, uid: str): + self.bucket_name = bucket_name + self.file_id = uid + + async def get_file(self): + file = await Bucket(self.bucket_name).get_object(self.file_id) + assert file, "No file found" + + async def hash(self): + match self.file.metadata.get("file_type"): + case "image": self.embedded = await self._image_hash() + case "video": self.embedded = await self._video_hash() + case _: raise ValueError("Unsupported file type") + + async def _image_hash(self): whash(await self.file.file.read()) + + async def _video_hash(self): VHashPatch(self.file.file) diff --git a/storage-app/src/worker.py b/storage-app/src/worker.py index 7c6b22b..7d89c71 100644 --- a/storage-app/src/worker.py +++ b/storage-app/src/worker.py @@ -1,28 +1,37 @@ from celery import Celery from shared.settings import BROKER_URL, RESULT_URL, CELERY_CONFIG -from shared.worker_services import Zipper +from shared.worker_services import Zipper, Hasher from asyncio import get_event_loop if not BROKER_URL or not RESULT_URL: raise ValueError("no broker environment") -WORKER: Celery = Celery() +worker: Celery = Celery(**CELERY_CONFIG) -WORKER.conf.broker_url = BROKER_URL -WORKER.conf.result_backend = RESULT_URL - -@WORKER.task(name="produce_download_task") +@worker.task(name="produce_download_task") def produce_download_task(bucket_name: str, file_ids: list[str]) -> str | None: - zipper: Zipper = Zipper(bucket_name, file_ids) + task: Zipper = Zipper(bucket_name, file_ids) + loop = get_event_loop() + + async def _inner(): + await task.archive_objects() + await task.write_archive() + task.delete_temp_zip() + + loop.run_until_complete(_inner()) + return task.archive_id + + +worker.task(name="produce_handle_media_task") +def produce_handle_media_task(bucket_name: str, uid: str) -> None: loop = get_event_loop() + task: Hasher = Hasher(bucket_name, uid) async def _inner(): - await zipper.archive_objects() - await zipper.write_archive() - zipper.delete_temp_zip() + await task.get_file() + await task.hash() loop.run_until_complete(_inner()) - return zipper.archive_id -if __name__ == "__main__": WORKER.worker_main(argv=CELERY_CONFIG) +if __name__ == "__main__": worker.start()