Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(agents-api): Fix blob store, ifelse branch, temporal postgres #882

Merged
merged 2 commits into from
Nov 25, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion agents-api/agents_api/activities/execute_system.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,9 @@
)
from ..autogen.Sessions import CreateSessionRequest
from ..autogen.Tools import SystemDef
from ..common.protocol.remote import RemoteObject
from ..common.protocol.tasks import StepContext
from ..common.storage_handler import auto_blob_store
from ..common.storage_handler import auto_blob_store, load_from_blob_store_if_remote
from ..env import testing
from ..models.developer import get_developer
from .utils import get_handler
Expand All @@ -33,6 +34,10 @@ async def execute_system(
) -> Any:
"""Execute a system call with the appropriate handler and transformed arguments."""
arguments: dict[str, Any] = system.arguments or {}

if set(arguments.keys()) == {"bucket", "key"}:
arguments = load_from_blob_store_if_remote(arguments)

arguments["developer_id"] = context.execution_input.developer_id

# Unbox all the arguments
Expand Down
4 changes: 4 additions & 0 deletions agents-api/agents_api/common/storage_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,10 @@ def load_from_blob_store_if_remote(x: Any | RemoteObject) -> Any:
elif isinstance(x, RemoteList):
x = list(x)

elif isinstance(x, dict) and set(x.keys()) == {"bucket", "key"}:
creatorrr marked this conversation as resolved.
Show resolved Hide resolved
fetched = s3.get_object(x["key"])
return deserialize(fetched)

return x


Expand Down
4 changes: 2 additions & 2 deletions agents-api/agents_api/models/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -213,7 +213,7 @@ def is_resource_busy(e: Exception) -> bool:
return isinstance(e, HTTPException) and e.status_code == 429

@retry(
stop=stop_after_attempt(2),
stop=stop_after_attempt(4),
wait=wait_exponential(multiplier=1, min=4, max=10),
retry=retry_if_exception(is_resource_busy),
)
Expand Down Expand Up @@ -254,7 +254,7 @@ def wrapper(*args: P.args, client=None, **kwargs: P.kwargs) -> pd.DataFrame:

debug and print(repr(e))

if "busy" in str(getattr(e, "resp", e)).lower():
if "busy" in (str(e) + str(getattr(e, "resp", e))).lower():
raise HTTPException(
status_code=429, detail="Resource busy. Please try again later."
) from e
Expand Down
6 changes: 5 additions & 1 deletion agents-api/agents_api/workflows/task_execution/helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
with workflow.unsafe.imports_passed_through():
from ...activities import task_steps
from ...autogen.openapi_model import (
EvaluateStep,
TransitionTarget,
Workflow,
WorkflowStep,
Expand Down Expand Up @@ -90,14 +91,17 @@ async def execute_if_else_branch(
context: StepContext,
execution_input: ExecutionInput,
then_branch: WorkflowStep,
else_branch: WorkflowStep,
else_branch: WorkflowStep | None,
condition: bool,
previous_inputs: RemoteList | list[Any],
user_state: dict[str, Any] = {},
) -> Any:
workflow.logger.info(f"If-Else step: Condition evaluated to {condition}")
chosen_branch = then_branch if condition else else_branch

if chosen_branch is None:
chosen_branch = EvaluateStep(evaluate={"output": "_"})

if_else_wf_name = f"`{context.cursor.workflow}`[{context.cursor.step}].if_else"
if_else_wf_name += ".then" if condition else ".else"

Expand Down
10 changes: 9 additions & 1 deletion memory-store/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,14 @@ services:
ports:
- "9070:9070"

develop:
watch:
- action: sync+restart
path: ./options
target: /data/cozo.db/options
- action: rebuild
path: Dockerfile

labels:
ofelia.enabled: "true"
ofelia.job-exec.backupcron.schedule: "@every 3h"
Expand All @@ -35,4 +43,4 @@ volumes:
cozo_data:
external: true
cozo_backup:
external: true
external: true
8 changes: 4 additions & 4 deletions memory-store/options
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,12 @@
compaction_readahead_size=2097152
strict_bytes_per_sync=false
bytes_per_sync=1048576
max_background_jobs=16
max_background_jobs=32
avoid_flush_during_shutdown=false
max_background_flushes=-1
delayed_write_rate=16777216
max_open_files=-1
max_subcompactions=2
max_subcompactions=8
writable_file_max_buffer_size=5048576
wal_bytes_per_sync=0
max_background_compactions=-1
Expand Down Expand Up @@ -100,10 +100,10 @@
memtable_protection_bytes_per_key=0
target_file_size_multiplier=1
report_bg_io_stats=false
write_buffer_size=267108864
write_buffer_size=534217728
memtable_huge_page_size=0
max_successive_merges=0
max_write_buffer_number=25
max_write_buffer_number=50
prefix_extractor=rocksdb.CappedPrefix.9
bottommost_compression_opts={checksum=false;max_dict_buffer_bytes=0;enabled=false;max_dict_bytes=0;max_compressed_bytes_per_kb=896;parallel_threads=1;zstd_max_train_bytes=0;level=32767;use_zstd_dict_trainer=true;strategy=0;window_bits=-14;}
paranoid_file_checks=false
Expand Down
1 change: 1 addition & 0 deletions scheduler/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -107,3 +107,4 @@ services:

volumes:
temporal-db-data:
external: true