Skip to content

Commit

Permalink
Backend logic for multipart, 3 new endpoints, one for creating the si…
Browse files Browse the repository at this point in the history
…gned urls other for completing the multipart upload and the third for aborting
  • Loading branch information
shincap8 committed Jan 28, 2025
1 parent dc1c005 commit 658a323
Show file tree
Hide file tree
Showing 3 changed files with 207 additions and 27 deletions.
66 changes: 66 additions & 0 deletions backend/app/api/endpoints/base/model.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,18 @@
from fastapi.responses import FileResponse

from app.domain.schemas.base.model import (
AbortMultipartRequest,
BatchCreateExampleRequest,
BatchURLsResponse,
CompleteBigModelUploadRequest,
ConversationWithBufferMemoryRequest,
DownloadAllExamplesRequest,
ModelInTheLoopRequest,
ModelPredictionPerDatasetRequest,
SingleModelEvaluationRequest,
SingleModelEvaluationResponse,
UpdateModelInfoRequest,
UploadBigModelRequest,
UploadModelToS3AndEvaluateRequest,
)
from app.domain.services.base.model import ModelService
Expand Down Expand Up @@ -121,6 +125,68 @@ def heavy_evaluation(
return "The model will be evaluated in the background"


@router.post("/initiate-mutipart-upload", response_model=BatchURLsResponse)
def initiate_multipart_upload(model: UploadBigModelRequest):
return ModelService().initiate_multipart_upload(
model.model_name,
model.file_name,
model.content_type,
model.user_id,
model.task_code,
model.parts_count,
)


@router.post("/abort-mutipart-upload")
def abort_multipart_upload(model: AbortMultipartRequest):
return ModelService().abort_multipart_upload(
model.upload_id,
model.task_code,
model.model_name,
model.user_id,
model.file_name,
)


@router.post("/complete-multipart-upload")
def complete_multipart_upload(
model: CompleteBigModelUploadRequest,
background_tasks: BackgroundTasks,
):
ModelService().complete_multipart_upload(
model.upload_id,
model.parts,
model.user_id,
model.task_code,
model.model_name,
model.file_name,
)
data = ModelService().create_model(
model.model_name,
model.description,
model.num_paramaters,
model.languages,
model.license,
model.file_name,
model.user_id,
model.task_code,
)
background_tasks.add_task(
ModelService().run_heavy_evaluation,
data["model_path"],
data["model_id"],
data["save_s3_path"],
data["inference_url"],
data["metadata_url"],
)
background_tasks.add_task(
ModelService().send_uploaded_model_email,
data["user_email"],
data["model_name"],
)
return "The model will be evaluated in the background"


@router.get("/initiate_lambda_models")
def initiate_lambda_models() -> None:
return ModelService().initiate_lambda_models()
Expand Down
46 changes: 43 additions & 3 deletions backend/app/domain/schemas/base/model.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
# Copyright (c) MLCommons and its affiliates.
# This source code is licensed under the MIT license found in the
# LICENSE file in the root directory of this source tree.
from typing import Optional, Union
from typing import List, Optional, Union

from fastapi import File, UploadFile
from pydantic import BaseModel
Expand Down Expand Up @@ -35,8 +35,7 @@ class ModelInTheLoopRequest(BaseModel):
task_id: int


@form_body
class UploadModelToS3AndEvaluateRequest(BaseModel):
class BaseforModelRequest(BaseModel):
model_name: Optional[str]
description: Optional[str]
num_paramaters: Optional[float]
Expand All @@ -45,6 +44,10 @@ class UploadModelToS3AndEvaluateRequest(BaseModel):
file_name: str
user_id: int
task_code: str


@form_body
class UploadModelToS3AndEvaluateRequest(BaseforModelRequest):
file_to_upload: UploadFile = File(...)


Expand Down Expand Up @@ -86,3 +89,40 @@ class UpdateModelInfoRequest(BaseModel):

class DownloadAllExamplesRequest(BaseModel):
task_id: int


class UploadBigModelRequest(BaseModel):
model_name: str
file_name: str
content_type: str
user_id: int
task_code: str
parts_count: int


class PreSignedURL(BaseModel):
batch_numer: int
batch_presigned_url: str


class BatchURLsResponse(BaseModel):
upload_id: str
urls: List


class FilePart(BaseModel):
ETag: str
PartNumber: int


class CompleteBigModelUploadRequest(BaseforModelRequest):
upload_id: str
parts: List[FilePart]


class AbortMultipartRequest(BaseModel):
upload_id: str
task_code: str
model_name: str
user_id: int
file_name: str
122 changes: 98 additions & 24 deletions backend/app/domain/services/base/model.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
from typing import List

import boto3
import boto3.session
import requests
import yaml
from fastapi import HTTPException, UploadFile
Expand Down Expand Up @@ -62,7 +63,9 @@ def __init__(self):
aws_secret_access_key=os.getenv("AWS_SECRET_ACCESS_KEY"),
region_name=os.getenv("AWS_REGION"),
)
self.s3 = self.session.client("s3")
self.s3 = self.session.client(
"s3", config=boto3.session.Config(signature_version="s3v4")
)
self.s3_bucket = os.getenv("AWS_S3_BUCKET")
self.email_helper = EmailHelper()
self.providers = {
Expand Down Expand Up @@ -655,40 +658,73 @@ def initiate_multipart_upload(
model_name_clean = re.sub(r"_+", "_", model_name_clean)

model_path = f"{task_code}/submited_models/{task_id}-{user_id}-{model_name}-{clean_file_name}"

response = self.s3.create_multipart_upload(
Bucket=task_s3_bucket, Key=model_path, ContentType=content_type
)
upload_id = response["UploadId"]

urls = []
for part_number in range(1, parts_count):
presigned_url = self.s3.generate_presigned_url(
"upload_part",
Params={
"Bucket": task_s3_bucket,
"Key": model_path,
"UploadId": upload_id,
"PartNumber": part_number,
},
ExpiresIn=3600,
try:
response = self.s3.create_multipart_upload(
Bucket=task_s3_bucket, Key=model_path, ContentType=content_type
)
urls.append(presigned_url)
upload_id = response["UploadId"]

urls = []
for part_number in range(1, parts_count + 1):
presigned_url = self.s3.generate_presigned_url(
"upload_part",
Params={
"Bucket": task_s3_bucket,
"Key": model_path,
"UploadId": upload_id,
"PartNumber": part_number,
},
ExpiresIn=3600,
HttpMethod="put",
)
urls.append(presigned_url)
except Exception as e:
print("There was an error while generating pre signed urls", e)

return
return {"upload_id": upload_id, "urls": urls}

def complete_multipart_upload(
self, upload_id: int, parts: List, bucket_name: str, object_key: str
self,
upload_id: int,
parts: List,
user_id: str,
task_code: str,
model_name: str,
file_name: str,
):
task_id = self.task_repository.get_task_id_by_task_code(task_code)[0]
yaml_file = self.task_repository.get_config_file_by_task_id(task_id)[0]
yaml_file = yaml.safe_load(yaml_file)
task_s3_bucket = self.task_repository.get_s3_bucket_by_task_id(task_id)[0]

file_name = file_name.lower()
file_name = file_name.replace("/", ":")
file_name = re.sub(r"\s+", "_", file_name)
clean_file_name = re.sub(r"_+", "_", file_name)

model_name_clean = model_name.lower()
model_name_clean = model_name_clean.replace("/", ":")
model_name_clean = re.sub(r"\s+", "_", model_name_clean)
model_name_clean = re.sub(r"_+", "_", model_name_clean)

model_path = f"{task_code}/submited_models/{task_id}-{user_id}-{model_name}-{clean_file_name}"

parts = sorted(parts, key=lambda x: x.PartNumber)
parts = [p.dict() for p in parts]

try:
self.s3.complete_multipart_upload(
Bucket=bucket_name,
Key=object_key,
Bucket=task_s3_bucket,
Key=model_path,
UploadId=upload_id,
MultipartUpload={"Parts": parts},
)
return {"message": "Upload completed successfully"}
self.s3.head_object(
Bucket=task_s3_bucket,
Key=model_path,
)
except Exception as e:
print("Failed to complete upload:", e)
raise HTTPException(
status_code=500, detail=f"Failed to complete upload: {str(e)}"
)
Expand Down Expand Up @@ -749,7 +785,45 @@ def create_model(
"user_email": user_email,
"inference_url": inference_url,
"metadata_url": metadata_url,
"s3_bucket": task_s3_bucket,
}
except Exception as e:
print(f"An unexpected error occurred: {e}")
return "Model upload failed"

def abort_multipart_upload(
self,
upload_id: str,
task_code: str,
model_name: str,
user_id: int,
file_name: str,
):
task_id = self.task_repository.get_task_id_by_task_code(task_code)[0]
yaml_file = self.task_repository.get_config_file_by_task_id(task_id)[0]
yaml_file = yaml.safe_load(yaml_file)
task_s3_bucket = self.task_repository.get_s3_bucket_by_task_id(task_id)[0]

file_name = file_name.lower()
file_name = file_name.replace("/", ":")
file_name = re.sub(r"\s+", "_", file_name)
clean_file_name = re.sub(r"_+", "_", file_name)

model_name_clean = model_name.lower()
model_name_clean = model_name_clean.replace("/", ":")
model_name_clean = re.sub(r"\s+", "_", model_name_clean)
model_name_clean = re.sub(r"_+", "_", model_name_clean)

model_path = f"{task_code}/submited_models/{task_id}-{user_id}-{model_name}-{clean_file_name}"

try:
self.s3.abort_multipart_upload(
Bucket=task_s3_bucket,
Key=model_path,
UploadId=upload_id,
)
return {"message": "Multipart upload aborted successfully."}
except Exception as e:
raise HTTPException(
status_code=500, detail=f"Failed to abort upload: {str(e)}"
)

0 comments on commit 658a323

Please sign in to comment.