Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(agents-api): Use blob store for objects that are too big #645

Merged
merged 13 commits into from
Oct 16, 2024
Merged
1 change: 1 addition & 0 deletions .env.example
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ LITELLM_REDIS_PASSWORD=<your_litellm_redis_password>
# INTEGRATION_SERVICE_URL=http://integrations:8000
# USE_BLOB_STORE_FOR_TEMPORAL=false
# BLOB_STORE_CUTOFF_KB=1024
# BLOB_STORE_BUCKET=agents-api

# Memory Store
# -----------
Expand Down
89 changes: 89 additions & 0 deletions agents-api/agents_api/clients/s3.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
from functools import cache, lru_cache

from beartype import beartype
import boto3
import botocore
from xxhash import xxh3_64_hexdigest as xxhash_key

from ..env import (
s3_access_key,
s3_endpoint,
s3_secret_key,
blob_store_bucket,
blob_store_cutoff_kb,
)


@cache
def get_s3_client():
return boto3.client(
"s3",
endpoint_url=s3_endpoint,
aws_access_key_id=s3_access_key,
aws_secret_access_key=s3_secret_key,
)


def list_buckets() -> list[str]:
client = get_s3_client()
data = client.list_buckets()
buckets = [bucket["Name"] for bucket in data["Buckets"]]

return buckets


@cache
def setup():
client = get_s3_client()
if blob_store_bucket not in list_buckets():
client.create_bucket(Bucket=blob_store_bucket)


@lru_cache(maxsize=10_000)
def exists(key: str) -> bool:
client = get_s3_client()

try:
client.head_object(Bucket=blob_store_bucket, Key=key)
return True

except botocore.exceptions.ClientError as e:
if e.response["Error"]["Code"] == "404":
return False
else:
raise e


@beartype
def add_object(key: str, body: bytes, replace: bool = False) -> None:
creatorrr marked this conversation as resolved.
Show resolved Hide resolved
client = get_s3_client()

if replace:
client.put_object(Bucket=blob_store_bucket, Key=key, Body=body)
return

if exists(key):
return

client.put_object(Bucket=blob_store_bucket, Key=key, Body=body)


@lru_cache(maxsize=256 * 1024 // blob_store_cutoff_kb) # 256mb in cache
@beartype
def get_object(key: str) -> bytes:
client = get_s3_client()
return client.get_object(Bucket=blob_store_bucket, Key=key)["Body"].read()


@beartype
def delete_object(key: str) -> None:
client = get_s3_client()
client.delete_object(Bucket=blob_store_bucket, Key=key)


@beartype
def add_object_with_hash(body: bytes, replace: bool = False) -> str:
key = xxhash_key(body)
add_object(key, body, replace=replace)

return key
8 changes: 8 additions & 0 deletions agents-api/agents_api/env.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,14 @@
# -----
task_max_parallelism: int = env.int("AGENTS_API_TASK_MAX_PARALLELISM", default=100)

# Blob Store
# ----------
use_blob_store_for_temporal: bool = env.bool("USE_BLOB_STORE_FOR_TEMPORAL", default=False)
blob_store_bucket: str = env.str("BLOB_STORE_BUCKET", default="agents-api")
blob_store_cutoff_kb: int = env.int("BLOB_STORE_CUTOFF_KB", default=1024)
s3_endpoint: str = env.str("S3_ENDPOINT", default="http://seaweedfs:8333")
s3_access_key: str | None = env.str("S3_ACCESS_KEY", default=None)
s3_secret_key: str | None = env.str("S3_SECRET_KEY", default=None)

# Debug
# -----
Expand Down
32 changes: 30 additions & 2 deletions agents-api/agents_api/worker/codec.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,41 @@
EncodingPayloadConverter,
)

from ..clients import s3
from ..env import use_blob_store_for_temporal, blob_store_cutoff_kb


if use_blob_store_for_temporal:
s3.setup()


@dataclasses.dataclass
class RemoteObject:
key: str


def serialize(x: Any) -> bytes:
return compress(pickle.dumps(x, protocol=pickle.HIGHEST_PROTOCOL))
data = compress(pickle.dumps(x, protocol=pickle.HIGHEST_PROTOCOL))
data_size = sys.getsizeof(data)

if use_blob_store_for_temporal and data_size > blob_store_cutoff_kb * 1024:
print("-" * 100)
print("YAYAYAYA: Using blob store for temporal")
print("-" * 100)

key = s3.add_object_with_hash(data)
return serialize(RemoteObject(key=key))

return data


def deserialize(b: bytes) -> Any:
return pickle.loads(decompress(b))
object = pickle.loads(decompress(b))

if isinstance(object, RemoteObject):
return deserialize(s3.get_object(object.key))

return object


def from_payload_data(data: bytes, type_hint: Optional[Type] = None) -> Any:
Expand Down
6 changes: 6 additions & 0 deletions agents-api/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,12 @@ x--shared-environment: &shared-environment
TEMPORAL_WORKER_URL: ${TEMPORAL_WORKER_URL:-temporal:7233}
TRUNCATE_EMBED_TEXT: ${TRUNCATE_EMBED_TEXT:-True}
WORKER_URL: ${WORKER_URL:-temporal:7233}
USE_BLOB_STORE_FOR_TEMPORAL: ${USE_BLOB_STORE_FOR_TEMPORAL:-false}
BLOB_STORE_CUTOFF_KB: ${BLOB_STORE_CUTOFF_KB:-1024}
BLOB_STORE_BUCKET: ${BLOB_STORE_BUCKET:-agents-api}
S3_ENDPOINT: ${S3_ENDPOINT:-http://seaweedfs:8333}
S3_ACCESS_KEY: ${S3_ACCESS_KEY}
S3_SECRET_KEY: ${S3_SECRET_KEY}

x--base-agents-api: &base-agents-api
image: julepai/agents-api:${TAG:-dev}
Expand Down
Loading
Loading