Skip to content

Commit

Permalink
feat(agents-api): Use blob store for objects that are too big (#645)
Browse files Browse the repository at this point in the history
Fixes #579 

Signed-off-by: Diwank Singh Tomer <[email protected]>

<!-- ELLIPSIS_HIDDEN -->


----

> [!IMPORTANT]
> Integrates blob store for large object handling in agents-api with S3
client and updated serialization logic.
> 
>   - **Blob Store Integration**:
> - Adds `s3.py` for S3 client operations: `get_s3_client()`,
`list_buckets()`, `setup()`, `exists()`, `add_object()`, `get_object()`,
`delete_object()`, and `add_object_with_hash()`.
> - Updates `env.py` to include blob store configurations:
`use_blob_store_for_temporal`, `blob_store_bucket`,
`blob_store_cutoff_kb`, `s3_endpoint`, `s3_access_key`, `s3_secret_key`.
> - Modifies `codec.py` to use blob store for serialization if
`use_blob_store_for_temporal` is true and object size exceeds
`blob_store_cutoff_kb`.
>   - **Functionality**:
> - Decorator `auto_blob_store` in `storage_handler.py` to automatically
store large outputs in blob store and load inputs from blob store if
they are `RemoteObject`.
> - Applies `auto_blob_store` to multiple functions in `activities` and
`task_steps` to handle large data.
>   - **Configuration**:
> - Updates `.env.example` and `docker-compose.yml` to include blob
store environment variables.
>     - Adds `boto3` and `xxhash` to `pyproject.toml` dependencies.
> 
> <sup>This description was created by </sup>[<img alt="Ellipsis"
src="https://img.shields.io/badge/Ellipsis-blue?color=175173">](https://www.ellipsis.dev?ref=julep-ai%2Fjulep&utm_source=github&utm_medium=referral)<sup>
for 2ea0155. It will automatically
update as commits are pushed.</sup>


<!-- ELLIPSIS_HIDDEN -->

---------

Signed-off-by: Diwank Singh Tomer <[email protected]>
Co-authored-by: creatorrr <[email protected]>
Co-authored-by: HamadaSalhab <[email protected]>
Co-authored-by: HamadaSalhab <[email protected]>
Co-authored-by: github-actions[bot] <github-actions[bot]@users.noreply.github.com>
  • Loading branch information
5 people authored Oct 16, 2024
1 parent fd64463 commit cd6045d
Show file tree
Hide file tree
Showing 34 changed files with 2,302 additions and 1,171 deletions.
1 change: 1 addition & 0 deletions .env.example
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ LITELLM_REDIS_PASSWORD=<your_litellm_redis_password>
# INTEGRATION_SERVICE_URL=http://integrations:8000
# USE_BLOB_STORE_FOR_TEMPORAL=false
# BLOB_STORE_CUTOFF_KB=1024
# BLOB_STORE_BUCKET=agents-api

# Memory Store
# -----------
Expand Down
996 changes: 619 additions & 377 deletions README-CN.md

Large diffs are not rendered by default.

1,004 changes: 617 additions & 387 deletions README-FR.md

Large diffs are not rendered by default.

1,011 changes: 622 additions & 389 deletions README-JA.md

Large diffs are not rendered by default.

2 changes: 2 additions & 0 deletions agents-api/agents_api/activities/embed_docs.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,13 @@
from temporalio import activity

from ..clients import cozo, litellm
from ..common.storage_handler import auto_blob_store
from ..env import testing
from ..models.docs.embed_snippets import embed_snippets as embed_snippets_query
from .types import EmbedDocsPayload


@auto_blob_store
@beartype
async def embed_docs(
payload: EmbedDocsPayload, cozo_client=None, max_batch_size: int = 100
Expand Down
6 changes: 2 additions & 4 deletions agents-api/agents_api/activities/excecute_api_call.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,9 @@
from temporalio import activity

from ..autogen.openapi_model import ApiCallDef

# from ..clients import integrations
from ..common.storage_handler import auto_blob_store
from ..env import testing

# from ..models.tools import get_tool_args_from_metadata


class RequestArgs(TypedDict):
content: Optional[str]
Expand All @@ -23,6 +20,7 @@ class RequestArgs(TypedDict):
headers: Optional[dict[str, str]]


@auto_blob_store
@beartype
async def execute_api_call(
api_call: ApiCallDef,
Expand Down
2 changes: 2 additions & 0 deletions agents-api/agents_api/activities/execute_integration.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,12 @@
from ..autogen.openapi_model import IntegrationDef
from ..clients import integrations
from ..common.protocol.tasks import StepContext
from ..common.storage_handler import auto_blob_store
from ..env import testing
from ..models.tools import get_tool_args_from_metadata


@auto_blob_store
@beartype
async def execute_integration(
context: StepContext,
Expand Down
2 changes: 2 additions & 0 deletions agents-api/agents_api/activities/execute_system.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
)
from ..autogen.Tools import SystemDef
from ..common.protocol.tasks import StepContext
from ..common.storage_handler import auto_blob_store
from ..env import testing
from ..models.agent.create_agent import create_agent as create_agent_query
from ..models.agent.delete_agent import delete_agent as delete_agent_query
Expand Down Expand Up @@ -41,6 +42,7 @@
from ..routers.docs.search_docs import search_agent_docs, search_user_docs


@auto_blob_store
@beartype
async def execute_system(
context: StepContext,
Expand Down
2 changes: 2 additions & 0 deletions agents-api/agents_api/activities/task_steps/base_evaluate.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,12 @@
from openai import BaseModel
from temporalio import activity

from ...common.storage_handler import auto_blob_store
from ...env import testing
from ..utils import get_evaluator


@auto_blob_store
@beartype
async def base_evaluate(
exprs: str | list[str] | dict[str, str] | dict[str, dict[str, str]],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,11 @@
from temporalio import activity

from ... import models
from ...common.storage_handler import auto_blob_store
from ...env import testing


@auto_blob_store
@beartype
async def cozo_query_step(
query_name: str,
Expand Down
2 changes: 2 additions & 0 deletions agents-api/agents_api/activities/task_steps/evaluate_step.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,11 @@

from ...activities.utils import simple_eval_dict
from ...common.protocol.tasks import StepContext, StepOutcome
from ...common.storage_handler import auto_blob_store
from ...env import testing


@auto_blob_store
@beartype
async def evaluate_step(
context: StepContext,
Expand Down
2 changes: 2 additions & 0 deletions agents-api/agents_api/activities/task_steps/for_each_step.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,12 @@
StepContext,
StepOutcome,
)
from ...common.storage_handler import auto_blob_store
from ...env import testing
from .base_evaluate import base_evaluate


@auto_blob_store
@beartype
async def for_each_step(context: StepContext) -> StepOutcome:
try:
Expand Down
2 changes: 2 additions & 0 deletions agents-api/agents_api/activities/task_steps/get_value_step.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,13 @@
from temporalio import activity

from ...common.protocol.tasks import StepContext, StepOutcome
from ...common.storage_handler import auto_blob_store
from ...env import testing


# TODO: We should use this step to query the parent workflow and get the value from the workflow context
# SCRUM-1
@auto_blob_store
@beartype
async def get_value_step(
context: StepContext,
Expand Down
2 changes: 2 additions & 0 deletions agents-api/agents_api/activities/task_steps/if_else_step.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,12 @@
StepContext,
StepOutcome,
)
from ...common.storage_handler import auto_blob_store
from ...env import testing
from .base_evaluate import base_evaluate


@auto_blob_store
@beartype
async def if_else_step(context: StepContext) -> StepOutcome:
# NOTE: This activity is only for logging, so we just evaluate the expression
Expand Down
2 changes: 2 additions & 0 deletions agents-api/agents_api/activities/task_steps/log_step.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,12 @@
StepContext,
StepOutcome,
)
from ...common.storage_handler import auto_blob_store
from ...common.utils.template import render_template
from ...env import testing


@auto_blob_store
@beartype
async def log_step(context: StepContext) -> StepOutcome:
# NOTE: This activity is only for logging, so we just evaluate the expression
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,12 @@
StepContext,
StepOutcome,
)
from ...common.storage_handler import auto_blob_store
from ...env import testing
from .base_evaluate import base_evaluate


@auto_blob_store
@beartype
async def map_reduce_step(context: StepContext) -> StepOutcome:
try:
Expand Down
2 changes: 2 additions & 0 deletions agents-api/agents_api/activities/task_steps/prompt_step.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
litellm, # We dont directly import `acompletion` so we can mock it
)
from ...common.protocol.tasks import StepContext, StepOutcome
from ...common.storage_handler import auto_blob_store
from ...common.utils.template import render_template
from ...models.tools.list_tools import list_tools

Expand All @@ -28,6 +29,7 @@ def format_agent_tool(tool: Tool) -> dict:


@activity.defn
@auto_blob_store
@beartype
async def prompt_step(context: StepContext) -> StepOutcome:
# Get context data
Expand Down
Original file line number Diff line number Diff line change
@@ -1,16 +1,20 @@
import base64

from beartype import beartype
from temporalio import activity

from ...autogen.openapi_model import CreateTransitionRequest
from ...common.protocol.tasks import (
StepContext,
StepOutcome,
)
from ...common.storage_handler import auto_blob_store
from .transition_step import original_transition_step


@activity.defn
@auto_blob_store
@beartype
async def raise_complete_async(context: StepContext, output: StepOutcome) -> None:
activity_info = activity.info()

Expand Down
6 changes: 4 additions & 2 deletions agents-api/agents_api/activities/task_steps/return_step.py
Original file line number Diff line number Diff line change
@@ -1,17 +1,19 @@
from beartype import beartype
from temporalio import activity

from ...autogen.openapi_model import ReturnStep
from ...common.protocol.tasks import (
StepContext,
StepOutcome,
)
from ...common.storage_handler import auto_blob_store
from ...env import testing
from .base_evaluate import base_evaluate


@auto_blob_store
@beartype
async def return_step(context: StepContext) -> StepOutcome:
# NOTE: This activity is only for returning immediately, so we just evaluate the expression
# Hence, it's a local activity and SHOULD NOT fail
try:
assert isinstance(context.current_step, ReturnStep)

Expand Down
2 changes: 2 additions & 0 deletions agents-api/agents_api/activities/task_steps/set_value_step.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,13 @@

from ...activities.utils import simple_eval_dict
from ...common.protocol.tasks import StepContext, StepOutcome
from ...common.storage_handler import auto_blob_store
from ...env import testing


# TODO: We should use this step to signal to the parent workflow and set the value on the workflow context
# SCRUM-2
@auto_blob_store
@beartype
async def set_value_step(
context: StepContext,
Expand Down
4 changes: 2 additions & 2 deletions agents-api/agents_api/activities/task_steps/switch_step.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,12 @@
StepContext,
StepOutcome,
)
from ...common.storage_handler import auto_blob_store
from ...env import testing
from ..utils import get_evaluator


@auto_blob_store
@beartype
async def switch_step(context: StepContext) -> StepOutcome:
try:
Expand All @@ -36,8 +38,6 @@ async def switch_step(context: StepContext) -> StepOutcome:
return StepOutcome(error=str(e))


# Note: This is here just for clarity. We could have just imported switch_step directly
# They do the same thing, so we dont need to mock the switch_step function
mock_switch_step = switch_step

switch_step = activity.defn(name="switch_step")(
Expand Down
2 changes: 2 additions & 0 deletions agents-api/agents_api/activities/task_steps/tool_call_step.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
StepContext,
StepOutcome,
)
from ...common.storage_handler import auto_blob_store


# FIXME: This shouldn't be here.
Expand Down Expand Up @@ -44,6 +45,7 @@ def construct_tool_call(tool: TaskToolDef, arguments: dict, call_id: str) -> dic


@activity.defn
@auto_blob_store
@beartype
async def tool_call_step(context: StepContext) -> StepOutcome:
assert isinstance(context.current_step, ToolCallStep)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,11 +1,15 @@
from beartype import beartype
from temporalio import activity

from ...autogen.openapi_model import WaitForInputStep
from ...common.protocol.tasks import StepContext, StepOutcome
from ...common.storage_handler import auto_blob_store
from ...env import testing
from .base_evaluate import base_evaluate


@auto_blob_store
@beartype
async def wait_for_input_step(context: StepContext) -> StepOutcome:
try:
assert isinstance(context.current_step, WaitForInputStep)
Expand All @@ -21,8 +25,6 @@ async def wait_for_input_step(context: StepContext) -> StepOutcome:
return StepOutcome(error=str(e))


# Note: This is here just for clarity. We could have just imported wait_for_input_step directly
# They do the same thing, so we dont need to mock the wait_for_input_step function
mock_wait_for_input_step = wait_for_input_step

wait_for_input_step = activity.defn(name="wait_for_input_step")(
Expand Down
7 changes: 3 additions & 4 deletions agents-api/agents_api/activities/task_steps/yield_step.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,17 +3,16 @@
from beartype import beartype
from temporalio import activity

from agents_api.autogen.openapi_model import TransitionTarget, YieldStep

from ...autogen.openapi_model import TransitionTarget, YieldStep
from ...common.protocol.tasks import StepContext, StepOutcome
from ...common.storage_handler import auto_blob_store
from ...env import testing
from .base_evaluate import base_evaluate


@auto_blob_store
@beartype
async def yield_step(context: StepContext) -> StepOutcome:
# NOTE: This activity is only for returning immediately, so we just evaluate the expression
# Hence, it's a local activity and SHOULD NOT fail
try:
assert isinstance(context.current_step, YieldStep)

Expand Down
Loading

0 comments on commit cd6045d

Please sign in to comment.