Skip to content

Commit

Permalink
hash worker
Browse files Browse the repository at this point in the history
  • Loading branch information
githubering182 committed Jul 31, 2024
1 parent b91d9c1 commit 0564066
Show file tree
Hide file tree
Showing 7 changed files with 177 additions and 43 deletions.
14 changes: 14 additions & 0 deletions storage-app/requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -50,3 +50,17 @@ requests==2.31.0
urllib3==2.0.5
httpcore==1.0.4
httpx==0.27.0
PyWavelets==1.6.0
imagehash==4.3.1
numpy==2.0.1
pillow==10.4.0
scipy==1.14.0
brotli==1.1.0
charset-normalizer==3.3.2
imagedominantcolor==1.0.1
mutagen==1.47.0
pycryptodomex==3.20.0
requests==2.32.3
urllib3==2.2.2
videohash==3.0.1
yt-dlp==2024.7.25
6 changes: 5 additions & 1 deletion storage-app/src/router/storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,11 @@


@router.get("/api/storage/{bucket_name}/{file_id}/")
async def get_file(request: Request, bucket_name: str, file_id: str) -> StreamingResponse:
async def get_file(
request: Request,
bucket_name: str,
file_id: str
) -> StreamingResponse:
project_bucket: Bucket = Bucket(bucket_name)
stream: ObjectStreaming | None = await project_bucket.get_object(file_id)

Expand Down
3 changes: 2 additions & 1 deletion storage-app/src/shared/app_services.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from typing import Any, Pattern, Optional, AsyncGenerator
from gridfs import NoFile, ObjectId
from gridfs import NoFile
from bson import ObjectId
from fastapi import Request, status
from fastapi.responses import StreamingResponse
from re import compile, I, Match
Expand Down
69 changes: 69 additions & 0 deletions storage-app/src/shared/hasher.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
from imagehash import whash, ImageHash
from videohash import VideoHash
from videohash.utils import (
create_and_return_temporary_directory as mk_temp_dir,
does_path_exists
)
from PIL import Image
from os.path import join, sep
from pathlib import Path
from asyncio import get_event_loop
from motor.motor_asyncio import AsyncIOMotorGridOut

Image.ANTIALIAS = Image.Resampling.LANCZOS


class VHashPatch(VideoHash):
hash: ImageHash
_file: AsyncIOMotorGridOut

def __init__(self, *args, **kwargs):
file, *_ = args
self._file = file
super().__init__(*args, **kwargs)

def _calc_hash(self): self.hash = whash(self.image)

def _create_required_dirs_and_check_for_errors(self):
if not self.storage_path: self.storage_path = mk_temp_dir()

assert does_path_exists(self.storage_path), f"Storage path '{self.storage_path}' does not exist."

self.storage_path = join(self.storage_path, (f"{self.task_uid}{sep}"))

self.video_dir = join(self.storage_path, (f"video{sep}"))
Path(self.video_dir).mkdir(parents=True, exist_ok=True)

self.video_download_dir = join(self.storage_path, (f"downloadedvideo{sep}"))
Path(self.video_download_dir).mkdir(parents=True, exist_ok=True)

self.frames_dir = join(self.storage_path, (f"frames{sep}"))
Path(self.frames_dir).mkdir(parents=True, exist_ok=True)

self.tiles_dir = join(self.storage_path, (f"tiles{sep}"))
Path(self.tiles_dir).mkdir(parents=True, exist_ok=True)

self.collage_dir = join(self.storage_path, (f"collage{sep}"))
Path(self.collage_dir).mkdir(parents=True, exist_ok=True)

self.horizontally_concatenated_image_dir = join(
self.storage_path,
f"horizontally_concatenated_image{sep}"
)
Path(self.horizontally_concatenated_image_dir).mkdir(
parents=True,
exist_ok=True
)

def _copy_video_to_video_dir(self):
assert self._file.metadata, "No file meta to read"

extension = self._file.metadata.get("file_extension")
self.video_path = join(self.video_dir, f"video.{extension}")

get_event_loop().run_until_complete(self._write_file())

async def _write_file(self):
with open(self.video_path, "wb") as file:
self._file.seek(0)
file.write(await self._file.read())
7 changes: 6 additions & 1 deletion storage-app/src/shared/settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,4 +48,9 @@
"log_level": "debug" if DEBUG else "critical"
}

CELERY_CONFIG: list[str] = ["worker", "--loglevel=INFO"]
CELERY_CONFIG: dict[str, Any] = {
"main": "worker",
"broker": BROKER_URL,
"backend": RESULT_URL,
"log": "info"
}
88 changes: 60 additions & 28 deletions storage-app/src/shared/worker_services.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import requests
from os import mkdir, path, remove
from motor.motor_asyncio import AsyncIOMotorGridOutCursor
from .hasher import whash, VHashPatch


class Zipper:
Expand All @@ -20,26 +21,30 @@ class Zipper:
temp_prefix = "./temp_zip"

def __init__(self, bucket_name: str, file_ids: list[str]) -> None:
self.object_set: AsyncIOMotorGridOutCursor = Bucket(bucket_name) \
.get_download_objects(file_ids)
self.object_set: AsyncIOMotorGridOutCursor = Bucket(
bucket_name
).get_download_objects(file_ids)

self._get_annotation(bucket_name, file_ids)

self.archive: str = ""
self.bucket_name = bucket_name

async def archive_objects(self) -> Optional[bool]:
if not self.annotated or self.written: return
if not self.annotated or self.written:
return

if not path.exists(self.temp_prefix): mkdir(self.temp_prefix)
if not path.exists(self.temp_prefix):
mkdir(self.temp_prefix)
self.archive = f"{self.temp_prefix}/{ObjectId()}.{self.archive_extension}"
json_data: Any = dumps(self.annotation, indent=4).encode('utf-8')
json_data: Any = dumps(self.annotation, indent=4).encode("utf-8")

with ZipFile(self.archive, 'w', ZIP_DEFLATED) as zip:
with ZipFile(self.archive, "w", ZIP_DEFLATED) as zip:
try:
while object := await self.object_set.next():
zip.writestr(self._get_object_name(object), object.read())
except StopAsyncIteration: ...
except StopAsyncIteration:
...

with BytesIO(json_data) as annotation:
zip.writestr("annotation.json", annotation.read())
Expand All @@ -48,26 +53,30 @@ async def archive_objects(self) -> Optional[bool]:
return self.written

async def write_archive(self) -> Optional[str]:
if self.archive_id: return self.archive_id
if self.archive_id:
return self.archive_id

if not self.archive: raise FileExistsError
if not self.archive:
raise FileExistsError

with open(self.archive, 'rb') as archive:
self._archive_id: ObjectId = await DataBase \
.get_fs_bucket(TEMP_BUCKET) \
.upload_from_stream(
filename=f"{self.bucket_name}_dataset",
source=archive,
metadata={"created_at": datetime.now().isoformat()}
)
with open(self.archive, "rb") as archive:
self._archive_id: ObjectId = await DataBase.get_fs_bucket(
TEMP_BUCKET
).upload_from_stream(
filename=f"{self.bucket_name}_dataset",
source=archive,
metadata={"created_at": datetime.now().isoformat()},
)

def delete_temp_zip(self) -> None: remove(self.archive)
def delete_temp_zip(self) -> None:
remove(self.archive)

def _get_object_name(self, object: GridOut) -> str:
prepared_name: str = object.name
extension: str = object.metadata.get("file_extension", "")

if extension: prepared_name += f".{extension}"
if extension:
prepared_name += f".{extension}"

return prepared_name

Expand All @@ -79,21 +88,21 @@ def _get_annotation(self, bucket_name: str, file_ids: list[str]) -> Any:
SECRET_ALGO,
)

try: _, project_id = bucket_name.split('_')
except Exception: project_id = ""
try:
_, project_id = bucket_name.split("_")
except Exception:
project_id = ""

headers: dict[str, Any] = {
"Authorization": "Internal " + payload_token,
"Content-Type": "application/json"
}
payload: dict[str, Any] = {
"project_id": project_id,
"file_ids": file_ids
"Content-Type": "application/json",
}
payload: dict[str, Any] = {"project_id": project_id, "file_ids": file_ids}

response: requests.Response = requests.post(url, headers=headers, json=payload)

if response.status_code != 202: raise ConnectionError
if response.status_code != 202:
raise ConnectionError

response_data: Any = response.json()

Expand All @@ -103,4 +112,27 @@ def _get_annotation(self, bucket_name: str, file_ids: list[str]) -> Any:
@property
def archive_id(self) -> Optional[str]:
a_id: Any = self.__dict__.get("_archive_id")
if a_id: return str(a_id)
if a_id:
return str(a_id)


class Hasher:
__slots__: tuple[str, ...] = ("bucket_name", "file_id", "file", "embedded")

def __init__(self, bucket_name: str, uid: str):
self.bucket_name = bucket_name
self.file_id = uid

async def get_file(self):
file = await Bucket(self.bucket_name).get_object(self.file_id)
assert file, "No file found"

async def hash(self):
match self.file.metadata.get("file_type"):
case "image": self.embedded = await self._image_hash()
case "video": self.embedded = await self._video_hash()
case _: raise ValueError("Unsupported file type")

async def _image_hash(self): whash(await self.file.file.read())

async def _video_hash(self): VHashPatch(self.file.file)
33 changes: 21 additions & 12 deletions storage-app/src/worker.py
Original file line number Diff line number Diff line change
@@ -1,28 +1,37 @@
from celery import Celery
from shared.settings import BROKER_URL, RESULT_URL, CELERY_CONFIG
from shared.worker_services import Zipper
from shared.worker_services import Zipper, Hasher
from asyncio import get_event_loop

if not BROKER_URL or not RESULT_URL: raise ValueError("no broker environment")

WORKER: Celery = Celery()
worker: Celery = Celery(**CELERY_CONFIG)

WORKER.conf.broker_url = BROKER_URL
WORKER.conf.result_backend = RESULT_URL


@WORKER.task(name="produce_download_task")
@worker.task(name="produce_download_task")
def produce_download_task(bucket_name: str, file_ids: list[str]) -> str | None:
zipper: Zipper = Zipper(bucket_name, file_ids)
task: Zipper = Zipper(bucket_name, file_ids)
loop = get_event_loop()

async def _inner():
await task.archive_objects()
await task.write_archive()
task.delete_temp_zip()

loop.run_until_complete(_inner())
return task.archive_id


worker.task(name="produce_handle_media_task")
def produce_handle_media_task(bucket_name: str, uid: str) -> None:
loop = get_event_loop()
task: Hasher = Hasher(bucket_name, uid)

async def _inner():
await zipper.archive_objects()
await zipper.write_archive()
zipper.delete_temp_zip()
await task.get_file()
await task.hash()

loop.run_until_complete(_inner())
return zipper.archive_id


if __name__ == "__main__": WORKER.worker_main(argv=CELERY_CONFIG)
if __name__ == "__main__": worker.start()

0 comments on commit 0564066

Please sign in to comment.