diff --git a/agents-api/agents_api/activities/execute_system.py b/agents-api/agents_api/activities/execute_system.py index e42891d92..bcf7855d2 100644 --- a/agents-api/agents_api/activities/execute_system.py +++ b/agents-api/agents_api/activities/execute_system.py @@ -18,8 +18,9 @@ ) from ..autogen.Sessions import CreateSessionRequest from ..autogen.Tools import SystemDef +from ..common.protocol.remote import RemoteObject from ..common.protocol.tasks import StepContext -from ..common.storage_handler import auto_blob_store +from ..common.storage_handler import auto_blob_store, load_from_blob_store_if_remote from ..env import testing from ..models.developer import get_developer from .utils import get_handler @@ -33,6 +34,10 @@ async def execute_system( ) -> Any: """Execute a system call with the appropriate handler and transformed arguments.""" arguments: dict[str, Any] = system.arguments or {} + + if set(arguments.keys()) == {"bucket", "key"}: + arguments = load_from_blob_store_if_remote(arguments) + arguments["developer_id"] = context.execution_input.developer_id # Unbox all the arguments diff --git a/agents-api/agents_api/common/storage_handler.py b/agents-api/agents_api/common/storage_handler.py index 894b0ff72..132c44e79 100644 --- a/agents-api/agents_api/common/storage_handler.py +++ b/agents-api/agents_api/common/storage_handler.py @@ -44,6 +44,10 @@ def load_from_blob_store_if_remote(x: Any | RemoteObject) -> Any: elif isinstance(x, RemoteList): x = list(x) + elif isinstance(x, dict) and set(x.keys()) == {"bucket", "key"}: + fetched = s3.get_object(x["key"]) + return deserialize(fetched) + return x diff --git a/agents-api/agents_api/models/utils.py b/agents-api/agents_api/models/utils.py index ef38f9baf..92fcda152 100644 --- a/agents-api/agents_api/models/utils.py +++ b/agents-api/agents_api/models/utils.py @@ -213,7 +213,7 @@ def is_resource_busy(e: Exception) -> bool: return isinstance(e, HTTPException) and e.status_code == 429 @retry( - stop=stop_after_attempt(2), + stop=stop_after_attempt(4), wait=wait_exponential(multiplier=1, min=4, max=10), retry=retry_if_exception(is_resource_busy), ) @@ -254,7 +254,7 @@ def wrapper(*args: P.args, client=None, **kwargs: P.kwargs) -> pd.DataFrame: debug and print(repr(e)) - if "busy" in str(getattr(e, "resp", e)).lower(): + if "busy" in (str(e) + str(getattr(e, "resp", e))).lower(): raise HTTPException( status_code=429, detail="Resource busy. Please try again later." ) from e diff --git a/agents-api/agents_api/workflows/task_execution/helpers.py b/agents-api/agents_api/workflows/task_execution/helpers.py index 6d47b3a1f..09ecb6150 100644 --- a/agents-api/agents_api/workflows/task_execution/helpers.py +++ b/agents-api/agents_api/workflows/task_execution/helpers.py @@ -10,6 +10,7 @@ with workflow.unsafe.imports_passed_through(): from ...activities import task_steps from ...autogen.openapi_model import ( + EvaluateStep, TransitionTarget, Workflow, WorkflowStep, @@ -90,7 +91,7 @@ async def execute_if_else_branch( context: StepContext, execution_input: ExecutionInput, then_branch: WorkflowStep, - else_branch: WorkflowStep, + else_branch: WorkflowStep | None, condition: bool, previous_inputs: RemoteList | list[Any], user_state: dict[str, Any] = {}, @@ -98,6 +99,9 @@ async def execute_if_else_branch( workflow.logger.info(f"If-Else step: Condition evaluated to {condition}") chosen_branch = then_branch if condition else else_branch + if chosen_branch is None: + chosen_branch = EvaluateStep(evaluate={"output": "_"}) + if_else_wf_name = f"`{context.cursor.workflow}`[{context.cursor.step}].if_else" if_else_wf_name += ".then" if condition else ".else" diff --git a/memory-store/docker-compose.yml b/memory-store/docker-compose.yml index 51dcaea32..190d1b125 100644 --- a/memory-store/docker-compose.yml +++ b/memory-store/docker-compose.yml @@ -16,6 +16,14 @@ services: ports: - "9070:9070" + develop: + watch: + - action: sync+restart + path: ./options + target: /data/cozo.db/options + - action: rebuild + path: Dockerfile + labels: ofelia.enabled: "true" ofelia.job-exec.backupcron.schedule: "@every 3h" @@ -35,4 +43,4 @@ volumes: cozo_data: external: true cozo_backup: - external: true \ No newline at end of file + external: true diff --git a/memory-store/options b/memory-store/options index 8a8e9b474..b11fcc62b 100644 --- a/memory-store/options +++ b/memory-store/options @@ -12,12 +12,12 @@ compaction_readahead_size=2097152 strict_bytes_per_sync=false bytes_per_sync=1048576 - max_background_jobs=16 + max_background_jobs=32 avoid_flush_during_shutdown=false max_background_flushes=-1 delayed_write_rate=16777216 max_open_files=-1 - max_subcompactions=2 + max_subcompactions=8 writable_file_max_buffer_size=5048576 wal_bytes_per_sync=0 max_background_compactions=-1 @@ -100,10 +100,10 @@ memtable_protection_bytes_per_key=0 target_file_size_multiplier=1 report_bg_io_stats=false - write_buffer_size=267108864 + write_buffer_size=534217728 memtable_huge_page_size=0 max_successive_merges=0 - max_write_buffer_number=25 + max_write_buffer_number=50 prefix_extractor=rocksdb.CappedPrefix.9 bottommost_compression_opts={checksum=false;max_dict_buffer_bytes=0;enabled=false;max_dict_bytes=0;max_compressed_bytes_per_kb=896;parallel_threads=1;zstd_max_train_bytes=0;level=32767;use_zstd_dict_trainer=true;strategy=0;window_bits=-14;} paranoid_file_checks=false diff --git a/scheduler/docker-compose.yml b/scheduler/docker-compose.yml index 2c0cabacd..967fe5f40 100644 --- a/scheduler/docker-compose.yml +++ b/scheduler/docker-compose.yml @@ -107,3 +107,4 @@ services: volumes: temporal-db-data: + external: true