Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

F/migrate workflows to pg #989

Merged
merged 5 commits into from
Dec 25, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion agents-api/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -30,4 +30,4 @@ COPY . ./
ENV PYTHONUNBUFFERED=1
ENV GUNICORN_CMD_ARGS="--capture-output --enable-stdio-inheritance"

ENTRYPOINT ["uv", "run", "gunicorn", "agents_api.web:app", "-c", "gunicorn_conf.py"]
ENTRYPOINT ["uv", "run", "--offline", "--no-sync", "gunicorn", "agents_api.web:app", "-c", "gunicorn_conf.py"]
2 changes: 1 addition & 1 deletion agents-api/Dockerfile.worker
Original file line number Diff line number Diff line change
Expand Up @@ -30,4 +30,4 @@ COPY . ./
ENV PYTHONUNBUFFERED=1
ENV GUNICORN_CMD_ARGS="--capture-output --enable-stdio-inheritance"

ENTRYPOINT ["uv", "run", "python", "-m", "agents_api.worker"]
ENTRYPOINT ["uv", "run", "--offline", "--no-sync", "python", "-m", "agents_api.worker"]
2 changes: 2 additions & 0 deletions agents-api/agents_api/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,3 +9,5 @@

with workflow.unsafe.imports_passed_through():
import msgpack as msgpack

import os
73 changes: 0 additions & 73 deletions agents-api/agents_api/activities/embed_docs.py

This file was deleted.

2 changes: 1 addition & 1 deletion agents-api/agents_api/activities/execute_integration.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
from ..common.exceptions.tools import IntegrationExecutionException
from ..common.protocol.tasks import ExecutionInput, StepContext
from ..env import testing
from ..models.tools import get_tool_args_from_metadata
from ..queries.tools import get_tool_args_from_metadata


@beartype
Expand Down
3 changes: 1 addition & 2 deletions agents-api/agents_api/activities/task_steps/__init__.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,13 @@
# ruff: noqa: F401, F403, F405

from .base_evaluate import base_evaluate

# from .cozo_query_step import cozo_query_step
from .evaluate_step import evaluate_step
from .for_each_step import for_each_step
from .get_value_step import get_value_step
from .if_else_step import if_else_step
from .log_step import log_step
from .map_reduce_step import map_reduce_step
from .pg_query_step import pg_query_step
from .prompt_step import prompt_step
from .raise_complete_async import raise_complete_async
from .return_step import return_step
Expand Down
28 changes: 0 additions & 28 deletions agents-api/agents_api/activities/task_steps/cozo_query_step.py

This file was deleted.

38 changes: 38 additions & 0 deletions agents-api/agents_api/activities/task_steps/pg_query_step.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
from typing import Any

from async_lru import alru_cache
from beartype import beartype
from temporalio import activity

from ... import queries
from ...clients.pg import create_db_pool
from ...env import pg_dsn, testing


@alru_cache(maxsize=1)
async def get_db_pool(dsn: str):
return await create_db_pool(dsn=dsn)


@beartype
async def pg_query_step(
query_name: str,
values: dict[str, Any],
dsn: str = pg_dsn,
) -> Any:
pool = await get_db_pool(dsn=dsn)

(module_name, name) = query_name.split(".")

module = getattr(queries, module_name)
query = getattr(module, name)
return await query(**values, connection_pool=pool)


# Note: This is here just for clarity. We could have just imported pg_query_step directly
# They do the same thing, so we dont need to mock the pg_query_step function
mock_pg_query_step = pg_query_step

pg_query_step = activity.defn(name="pg_query_step")(
pg_query_step if not testing else mock_pg_query_step
)
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
)
from ...exceptions import LastErrorInput, TooManyRequestsError
from ...queries.executions.create_execution_transition import (
create_execution_transition_async,
create_execution_transition,
)
from ..utils import RateLimiter

Expand Down Expand Up @@ -52,7 +52,7 @@ async def transition_step(

# Create transition
try:
transition = await create_execution_transition_async(
transition = await create_execution_transition(
developer_id=context.execution_input.developer_id,
execution_id=context.execution_input.execution.id,
task_id=context.execution_input.task.id,
Expand Down
44 changes: 22 additions & 22 deletions agents-api/agents_api/activities/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -296,28 +296,28 @@ def get_handler(system: SystemDef) -> Callable:
The base handler function.
"""

from ..models.agent.create_agent import create_agent as create_agent_query
from ..models.agent.delete_agent import delete_agent as delete_agent_query
from ..models.agent.get_agent import get_agent as get_agent_query
from ..models.agent.list_agents import list_agents as list_agents_query
from ..models.agent.update_agent import update_agent as update_agent_query
from ..models.docs.delete_doc import delete_doc as delete_doc_query
from ..models.docs.list_docs import list_docs as list_docs_query
from ..models.session.create_session import create_session as create_session_query
from ..models.session.delete_session import delete_session as delete_session_query
from ..models.session.get_session import get_session as get_session_query
from ..models.session.list_sessions import list_sessions as list_sessions_query
from ..models.session.update_session import update_session as update_session_query
from ..models.task.create_task import create_task as create_task_query
from ..models.task.delete_task import delete_task as delete_task_query
from ..models.task.get_task import get_task as get_task_query
from ..models.task.list_tasks import list_tasks as list_tasks_query
from ..models.task.update_task import update_task as update_task_query
from ..models.user.create_user import create_user as create_user_query
from ..models.user.delete_user import delete_user as delete_user_query
from ..models.user.get_user import get_user as get_user_query
from ..models.user.list_users import list_users as list_users_query
from ..models.user.update_user import update_user as update_user_query
from ..queries.agents.create_agent import create_agent as create_agent_query
from ..queries.agents.delete_agent import delete_agent as delete_agent_query
from ..queries.agents.get_agent import get_agent as get_agent_query
from ..queries.agents.list_agents import list_agents as list_agents_query
from ..queries.agents.update_agent import update_agent as update_agent_query
from ..queries.docs.delete_doc import delete_doc as delete_doc_query
from ..queries.docs.list_docs import list_docs as list_docs_query
from ..queries.sessions.create_session import create_session as create_session_query
from ..queries.sessions.delete_session import delete_session as delete_session_query
from ..queries.sessions.get_session import get_session as get_session_query
from ..queries.sessions.list_sessions import list_sessions as list_sessions_query
from ..queries.sessions.update_session import update_session as update_session_query
from ..queries.tasks.create_task import create_task as create_task_query
from ..queries.tasks.delete_task import delete_task as delete_task_query
from ..queries.tasks.get_task import get_task as get_task_query
from ..queries.tasks.list_tasks import list_tasks as list_tasks_query
from ..queries.tasks.update_task import update_task as update_task_query
from ..queries.users.create_user import create_user as create_user_query
from ..queries.users.delete_user import delete_user as delete_user_query
from ..queries.users.get_user import get_user as get_user_query
from ..queries.users.list_users import list_users as list_users_query
from ..queries.users.update_user import update_user as update_user_query
from ..routers.docs.create_doc import create_agent_doc, create_user_doc
from ..routers.docs.search_docs import search_agent_docs, search_user_docs
from ..routers.sessions.chat import chat
Expand Down
30 changes: 16 additions & 14 deletions agents-api/agents_api/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,10 @@
@asynccontextmanager
async def lifespan(app: FastAPI):
# INIT POSTGRES #
db_dsn = os.environ.get("DB_DSN")
pg_dsn = os.environ.get("PG_DSN")

if not getattr(app.state, "postgres_pool", None):
app.state.postgres_pool = await create_db_pool(db_dsn)
app.state.postgres_pool = await create_db_pool(pg_dsn)

# INIT S3 #
s3_access_key = os.environ.get("S3_ACCESS_KEY")
Expand Down Expand Up @@ -67,7 +67,8 @@ async def lifespan(app: FastAPI):
lifespan=lifespan,
#
# Global dependencies
dependencies=[Depends(valid_content_length)],
# FIXME: This is blocking access to scalar
# dependencies=[Depends(valid_content_length)],
)

# Enable metrics
Expand All @@ -92,19 +93,20 @@ async def scalar_html():


# content-length validation
# FIXME: This is blocking access to scalar
# NOTE: This relies on client reporting the correct content-length header
# TODO: We should use streaming for large payloads
@app.middleware("http")
async def validate_content_length(
request: Request,
call_next: Callable[[Request], Coroutine[Any, Any, Response]],
):
content_length = request.headers.get("content-length")
# @app.middleware("http")
# async def validate_content_length(
# request: Request,
# call_next: Callable[[Request], Coroutine[Any, Any, Response]],
# ):
# content_length = request.headers.get("content-length")

if not content_length:
return Response(status_code=411, content="Content-Length header is required")
# if not content_length:
# return Response(status_code=411, content="Content-Length header is required")

if int(content_length) > max_payload_size:
return Response(status_code=413, content="Payload too large")
# if int(content_length) > max_payload_size:
# return Response(status_code=413, content="Payload too large")

return await call_next(request)
# return await call_next(request)
2 changes: 1 addition & 1 deletion agents-api/agents_api/clients/__init__.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
"""
The `clients` module contains client classes and functions for interacting with various external services and APIs, abstracting the complexity of HTTP requests and API interactions to provide a simplified interface for the rest of the application.

- `cozo.py`: Handles communication with the Cozo service, facilitating operations such as retrieving product information.
- `pg.py`: Handles communication with the PostgreSQL service, facilitating operations such as retrieving product information.
- `embed.py`: Manages requests to an Embedding Service for text embedding functionalities.
- `openai.py`: Facilitates interaction with OpenAI's API for natural language processing tasks.
- `temporal.py`: Provides functionality for connecting to Temporal workflows, enabling asynchronous task execution and management.
Expand Down
4 changes: 2 additions & 2 deletions agents-api/agents_api/clients/pg.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

import asyncpg

from ..env import db_dsn
from ..env import pg_dsn


async def _init_conn(conn):
Expand All @@ -16,5 +16,5 @@ async def _init_conn(conn):

async def create_db_pool(dsn: str | None = None):
return await asyncpg.create_pool(
dsn if dsn is not None else db_dsn, init=_init_conn
dsn if dsn is not None else pg_dsn, init=_init_conn
)
2 changes: 1 addition & 1 deletion agents-api/agents_api/common/protocol/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ class PartialTransition(create_partial_model(CreateTransitionRequest)):

class ExecutionInput(BaseModel):
developer_id: UUID
execution: Execution
execution: Execution | None = None
task: TaskSpecDef
agent: Agent
agent_tools: list[Tool | CreateToolRequest]
Expand Down
2 changes: 1 addition & 1 deletion agents-api/agents_api/common/utils/__init__.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
"""
The `utils` module within the `agents-api` project offers a collection of utility functions designed to support various aspects of the application. This includes:

- `cozo.py`: Utilities for interacting with the Cozo API client, including data mutation processes.
- `pg.py`: Utilities for interacting with the PostgreSQL API client, including data mutation processes.
- `datetime.py`: Functions for handling date and time operations, ensuring consistent use of time zones and formats across the application.
- `json.py`: Custom JSON utilities, including a custom JSON encoder for handling specific object types like UUIDs, and a utility function for JSON serialization with support for default values for None objects.

Expand Down
26 changes: 0 additions & 26 deletions agents-api/agents_api/common/utils/cozo.py

This file was deleted.

Loading