diff --git a/agents-api/agents_api/activities/logger.py b/agents-api/agents_api/activities/logger.py
new file mode 100644
index 000000000..4880d0652
--- /dev/null
+++ b/agents-api/agents_api/activities/logger.py
@@ -0,0 +1,8 @@
+import logging
+
+
+logger = logging.getLogger(__name__)
+h = logging.StreamHandler()
+fmt = logging.Formatter("[%(asctime)s/%(levelname)s] - %(message)s")
+h.setFormatter(fmt)
+logger.addHandler(h)
diff --git a/agents-api/agents_api/activities/summarization.py b/agents-api/agents_api/activities/summarization.py
index 85a8a9ae5..1eaa2003d 100644
--- a/agents-api/agents_api/activities/summarization.py
+++ b/agents-api/agents_api/activities/summarization.py
@@ -1,17 +1,25 @@
#!/usr/bin/env python3
+import asyncio
+from pycozo.client import QueryException
from uuid import UUID
from typing import Callable
from textwrap import dedent
from temporalio import activity
from litellm import acompletion
+from agents_api.models.entry.add_entries import add_entries_query
+from agents_api.models.entry.delete_entries import delete_entries_by_ids_query
from agents_api.models.entry.entries_summarization import (
get_toplevel_entries_query,
entries_summarization_query,
)
from agents_api.common.protocol.entries import Entry
from ..model_registry import JULEP_MODELS
-from ..env import summarization_model_name, model_inference_url, model_api_key
+from ..env import model_inference_url, model_api_key
+from agents_api.rec_sum.entities import get_entities
+from agents_api.rec_sum.summarize import summarize_messages
+from agents_api.rec_sum.trim import trim_messages
+from agents_api.activities.logger import logger
example_previous_memory = """
@@ -157,31 +165,85 @@ async def run_prompt(
return parser(content.strip() if content is not None else "")
+# @activity.defn
+# async def summarization(session_id: str) -> None:
+# session_id = UUID(session_id)
+# entries = [
+# Entry(**row)
+# for _, row in get_toplevel_entries_query(session_id=session_id).iterrows()
+# ]
+
+# assert len(entries) > 0, "no need to summarize on empty entries list"
+
+# response = await run_prompt(
+# dialog=entries, previous_memories=[], model=summarization_model_name
+# )
+
+# new_entry = Entry(
+# session_id=session_id,
+# source="summarizer",
+# role="system",
+# name="information",
+# content=response,
+# timestamp=entries[-1].timestamp + 0.01,
+# )
+
+# entries_summarization_query(
+# session_id=session_id,
+# new_entry=new_entry,
+# old_entry_ids=[e.id for e in entries],
+# )
+
+
@activity.defn
async def summarization(session_id: str) -> None:
session_id = UUID(session_id)
- entries = [
- Entry(**row)
- for _, row in get_toplevel_entries_query(session_id=session_id).iterrows()
- ]
+ entries = []
+ entities_entry_ids = []
+ for _, row in get_toplevel_entries_query(session_id=session_id).iterrows():
+ if row["role"] == "system" and row.get("name") == "entities":
+ entities_entry_ids.append(row["entry_id"])
+ else:
+ entries.append(row)
assert len(entries) > 0, "no need to summarize on empty entries list"
- response = await run_prompt(
- dialog=entries, previous_memories=[], model=f"openai/{summarization_model_name}"
+ trimmed_messages, entities = await asyncio.gather(
+ trim_messages(entries),
+ get_entities(entries),
)
-
- new_entry = Entry(
- session_id=session_id,
- source="summarizer",
- role="system",
- name="information",
- content=response,
- timestamp=entries[-1].timestamp + 0.01,
+ summarized = await summarize_messages(trimmed_messages)
+
+ ts_delta = (entries[1]["timestamp"] - entries[0]["timestamp"]) / 2
+
+ add_entries_query(
+ Entry(
+ session_id=session_id,
+ source="summarizer",
+ role="system",
+ name="entities",
+ content=entities["content"],
+ timestamp=entries[0]["timestamp"] + ts_delta,
+ )
)
- entries_summarization_query(
- session_id=session_id,
- new_entry=new_entry,
- old_entry_ids=[e.id for e in entries],
- )
+ try:
+ delete_entries_by_ids_query(entry_ids=entities_entry_ids)
+ except QueryException as e:
+ logger.exception(e)
+
+ for msg in summarized:
+ new_entry = Entry(
+ session_id=session_id,
+ source="summarizer",
+ role="system",
+ name="information",
+ content=msg["content"],
+ timestamp=entries[-1]["timestamp"] + 0.01,
+ )
+
+ entries_summarization_query(
+ session_id=session_id,
+ new_entry=new_entry,
+ old_entry_ids=[entries[idx]["entry_id"] for idx in msg["summarizes"]],
+ )
diff --git a/agents-api/agents_api/models/entry/delete_entries.py b/agents-api/agents_api/models/entry/delete_entries.py
index 8e7aacbcf..d13809d11 100644
--- a/agents-api/agents_api/models/entry/delete_entries.py
+++ b/agents-api/agents_api/models/entry/delete_entries.py
@@ -58,3 +58,50 @@ def delete_entries_query(session_id: UUID) -> tuple[str, dict]:
}"""
return (query, {"session_id": str(session_id)})
+
+
+@cozo_query
+def delete_entries_by_ids_query(entry_ids: list[UUID]) -> tuple[str, dict]:
+ entry_ids = [f'to_uuid("{id}")' for id in entry_ids]
+
+ query = """
+ {
+ input[entry_id] <- $entry_ids
+
+ ?[
+ session_id,
+ entry_id,
+ role,
+ name,
+ content,
+ source,
+ token_count,
+ created_at,
+ timestamp,
+ ] := input[entry_id],
+ *entries{
+ session_id,
+ entry_id,
+ role,
+ name,
+ content,
+ source,
+ token_count,
+ created_at,
+ timestamp,
+ }
+
+ :delete entries {
+ session_id,
+ entry_id,
+ role,
+ name,
+ content,
+ source,
+ token_count,
+ created_at,
+ timestamp,
+ }
+ }"""
+
+ return (query, {"entry_ids": entry_ids})
diff --git a/agents-api/agents_api/rec_sum/__init__.py b/agents-api/agents_api/rec_sum/__init__.py
index c2b9d406e..e69de29bb 100644
--- a/agents-api/agents_api/rec_sum/__init__.py
+++ b/agents-api/agents_api/rec_sum/__init__.py
@@ -1,3 +0,0 @@
-from .entities import get_entities
-from .summarize import summarize_messages
-from .trim import trim_messages
diff --git a/agents-api/agents_api/rec_sum/data.py b/agents-api/agents_api/rec_sum/data.py
index 2caa7231d..93e78851c 100644
--- a/agents-api/agents_api/rec_sum/data.py
+++ b/agents-api/agents_api/rec_sum/data.py
@@ -5,28 +5,21 @@
module_directory = Path(__file__).parent
-
-with open(f"{module_directory}/entities_example_chat.json", 'r') as _f:
+with open(f"{module_directory}/entities_example_chat.json", "r") as _f:
entities_example_chat = json.load(_f)
-
-with open(f"{module_directory}/trim_example_chat.json", 'r') as _f:
+with open(f"{module_directory}/trim_example_chat.json", "r") as _f:
trim_example_chat = json.load(_f)
-
-with open(f"{module_directory}/trim_example_result.json", 'r') as _f:
+with open(f"{module_directory}/trim_example_result.json", "r") as _f:
trim_example_result = json.load(_f)
-
-with open(f"{module_directory}/summarize_example_chat.json", 'r') as _f:
+with open(f"{module_directory}/summarize_example_chat.json", "r") as _f:
summarize_example_chat = json.load(_f)
-
-with open(f"{module_directory}/summarize_example_result.json", 'r') as _f:
+with open(f"{module_directory}/summarize_example_result.json", "r") as _f:
summarize_example_result = json.load(_f)
-
-
diff --git a/agents-api/agents_api/rec_sum/entities.py b/agents-api/agents_api/rec_sum/entities.py
index baf702597..2a853e337 100644
--- a/agents-api/agents_api/rec_sum/entities.py
+++ b/agents-api/agents_api/rec_sum/entities.py
@@ -1,6 +1,6 @@
import json
-from tenacity import retry, stop_after_attempt, wait_fixed
+from tenacity import retry, stop_after_attempt
from .data import entities_example_chat
from .generate import generate
@@ -41,50 +41,23 @@
- See the example to get a better idea of the task."""
-make_entities_prompt = lambda session, user="a user", assistant="gpt-4-turbo", **_: [f"""\
-You are given a session history of a chat between {user or "a user"} and {assistant or "gpt-4-turbo"}. The session is formatted in the ChatML JSON format (from OpenAI).
-
-{entities_instructions}
-
-
-{json.dumps(entities_example_chat, indent=2)}
-
-
-
-{entities_example_plan}
-
-
-
-{entities_example_result}
-""",
-
-f"""\
-Begin! Write the entities as a Markdown formatted list. First write your plan inside and then the extracted entities between .
-
-
-{json.dumps(session, indent=2)}
-
-"""]
-
+def make_entities_prompt(session, user="a user", assistant="gpt-4-turbo", **_):
+ return [
+ f"You are given a session history of a chat between {user or 'a user'} and {assistant or 'gpt-4-turbo'}. The session is formatted in the ChatML JSON format (from OpenAI).\n\n{entities_instructions}\n\n\n{json.dumps(entities_example_chat, indent=2)}\n\n\n\n{entities_example_plan}\n\n\n\n{entities_example_result}\n",
+ f"Begin! Write the entities as a Markdown formatted list. First write your plan inside and then the extracted entities between .\n\n\n{json.dumps(session, indent=2)}\n\n",
+ ]
@retry(stop=stop_after_attempt(2))
async def get_entities(
chat_session,
- model="gpt-4-turbo",
- stop=[" 2, "Session is too short"
- # Remove the system prompt if present
- if (
- chat_session[0]["role"] == "system"
- and chat_session[0].get("name") != "entities"
- ):
- chat_session = chat_session[1:]
-
names = get_names_from_session(chat_session)
system_prompt, user_message = make_entities_prompt(chat_session, **names)
messages = [chatml.system(system_prompt), chatml.user(user_message)]
@@ -100,5 +73,5 @@ async def get_entities(
result["content"] = result["content"].split("")[-1].strip()
result["role"] = "system"
result["name"] = "entities"
-
+
return chatml.make(**result)
diff --git a/agents-api/agents_api/rec_sum/generate.py b/agents-api/agents_api/rec_sum/generate.py
index 99c0a5e79..d8a280d36 100644
--- a/agents-api/agents_api/rec_sum/generate.py
+++ b/agents-api/agents_api/rec_sum/generate.py
@@ -7,16 +7,15 @@
@retry(wait=wait_fixed(2), stop=stop_after_attempt(5))
async def generate(
- messages: list[dict],
- client: AsyncClient=client,
- model: str="gpt-4-turbo",
+ messages: list[dict],
+ client: AsyncClient = client,
+ model: str = "gpt-4-turbo",
**kwargs
) -> dict:
result = await client.chat.completions.create(
model=model, messages=messages, **kwargs
)
-
+
result = result.choices[0].message.__dict__
return result
-
diff --git a/agents-api/agents_api/rec_sum/summarize.py b/agents-api/agents_api/rec_sum/summarize.py
index 9793be702..f0146210e 100644
--- a/agents-api/agents_api/rec_sum/summarize.py
+++ b/agents-api/agents_api/rec_sum/summarize.py
@@ -1,6 +1,6 @@
import json
-from tenacity import retry, stop_after_attempt, wait_fixed
+from tenacity import retry, stop_after_attempt
from .data import summarize_example_chat, summarize_example_result
from .generate import generate
@@ -24,7 +24,6 @@
- We can safely summarize message 34's essay into just the salient points only."""
-
summarize_instructions = """\
Your goal is to compactify the history by coalescing redundant information in messages into their summary in order to reduce its size and save costs.
@@ -36,53 +35,32 @@
- VERY IMPORTANT: Add the indices of messages that are being summarized so that those messages can then be removed from the session otherwise, there'll be no way to identify which messages to remove. See example for more details."""
-
-make_summarize_prompt = lambda session, user="a user", assistant="gpt-4-turbo", **_: [f"""\
-You are given a session history of a chat between {user or "a user"} and {assistant or "gpt-4-turbo"}. The session is formatted in the ChatML JSON format (from OpenAI).
-
-{summarize_instructions}
-
-
-{json.dumps(add_indices(summarize_example_chat), indent=2)}
-
-
-
-{summarize_example_plan}
-
-
-
-{json.dumps(summarize_example_result, indent=2)}
-""",
-
-f"""\
-Begin! Write the summarized messages as a json list just like the example above. First write your plan inside and then your answer between . Don't forget to add the indices of the messages being summarized alongside each summary.
-
-
-{json.dumps(add_indices(session), indent=2)}
-
-"""]
-
+def make_summarize_prompt(session, user="a user", assistant="gpt-4-turbo", **_):
+ return [
+ f"You are given a session history of a chat between {user or 'a user'} and {assistant or 'gpt-4-turbo'}. The session is formatted in the ChatML JSON format (from OpenAI).\n\n{summarize_instructions}\n\n\n{json.dumps(add_indices(summarize_example_chat), indent=2)}\n\n\n\n{summarize_example_plan}\n\n\n\n{json.dumps(summarize_example_result, indent=2)}\n",
+ f"Begin! Write the summarized messages as a json list just like the example above. First write your plan inside and then your answer between . Don't forget to add the indices of the messages being summarized alongside each summary.\n\n\n{json.dumps(add_indices(session), indent=2)}\n\n",
+ ]
@retry(stop=stop_after_attempt(2))
async def summarize_messages(
chat_session,
- model="gpt-4-turbo",
- stop=[" 2, "Session is too short"
offset = 0
-
+
# Remove the system prompt if present
if (
chat_session[0]["role"] == "system"
and chat_session[0].get("name") != "entities"
):
chat_session = chat_session[1:]
-
+
# The indices are not matched up correctly
offset = 1
@@ -98,7 +76,9 @@ async def summarize_messages(
)
assert "" in result["content"]
- summarized_messages = json.loads(result["content"].split("")[-1].strip())
+ summarized_messages = json.loads(
+ result["content"].split("")[-1].strip()
+ )
assert all((msg.get("summarizes") is not None for msg in summarized_messages))
@@ -107,5 +87,5 @@ async def summarize_messages(
{**msg, "summarizes": [i + offset for i in msg["summarizes"]]}
for msg in summarized_messages
]
-
+
return summarized_messages
diff --git a/agents-api/agents_api/rec_sum/trim.py b/agents-api/agents_api/rec_sum/trim.py
index b2af0b64c..eeb05719c 100644
--- a/agents-api/agents_api/rec_sum/trim.py
+++ b/agents-api/agents_api/rec_sum/trim.py
@@ -1,6 +1,6 @@
import json
-from tenacity import retry, stop_after_attempt, wait_fixed
+from tenacity import retry, stop_after_attempt
from .data import trim_example_chat, trim_example_result
from .generate import generate
@@ -19,7 +19,6 @@
- Message 7 is short enough and doesn't need any edits."""
-
trim_instructions = """\
Your goal is to identify messages in the session that are needlessly verbose and then trim them in length without losing any meaning or changing the tone of the message.
@@ -34,55 +33,23 @@
# It is important to make keep the tone, setting and flow of the conversation consistent while trimming the messages.
-make_trim_prompt = lambda session, user="a user", assistant="gpt-4-turbo", **_: [f"""\
-You are given a session history of a chat between {user or "a user"} and {assistant or "gpt-4-turbo"}. The session is formatted in the ChatML JSON format (from OpenAI).
-
-{trim_instructions}
-
-
-{json.dumps(add_indices(trim_example_chat), indent=2)}
-
-
-
-{trim_example_plan}
-
-
-
-{json.dumps(trim_example_result, indent=2)}
-""",
-
-f"""\
-Begin! Write the trimmed messages as a json list. First write your plan inside and then your answer between .
-
-
-{json.dumps(add_indices(session), indent=2)}
-
-"""]
-
+def make_trim_prompt(session, user="a user", assistant="gpt-4-turbo", **_):
+ return [
+ f"You are given a session history of a chat between {user or 'a user'} and {assistant or 'gpt-4-turbo'}. The session is formatted in the ChatML JSON format (from OpenAI).\n\n{trim_instructions}\n\n\n{json.dumps(add_indices(trim_example_chat), indent=2)}\n\n\n\n{trim_example_plan}\n\n\n\n{json.dumps(trim_example_result, indent=2)}\n",
+ f"Begin! Write the trimmed messages as a json list. First write your plan inside and then your answer between .\n\n\n{json.dumps(add_indices(session), indent=2)}\n\n",
+ ]
@retry(stop=stop_after_attempt(2))
async def trim_messages(
chat_session,
- model="gpt-4-turbo",
- stop=[" 2, "Session is too short"
- offset = 0
-
- # Remove the system prompt if present
- if (
- chat_session[0]["role"] == "system"
- and chat_session[0].get("name") != "entities"
- ):
- chat_session = chat_session[1:]
-
- # The indices are not matched up correctly
- offset = 1
-
names = get_names_from_session(chat_session)
system_prompt, user_message = make_trim_prompt(chat_session, **names)
messages = [chatml.system(system_prompt), chatml.user(user_message)]
@@ -100,9 +67,6 @@ async def trim_messages(
assert all((msg.get("index") is not None for msg in trimmed_messages))
# Correct offset
- trimmed_messages = [
- {**msg, "index": msg["index"] + offset}
- for msg in trimmed_messages
- ]
-
+ trimmed_messages = [{**msg, "index": msg["index"]} for msg in trimmed_messages]
+
return trimmed_messages
diff --git a/agents-api/agents_api/rec_sum/utils.py b/agents-api/agents_api/rec_sum/utils.py
index 512bfff9f..596174e08 100644
--- a/agents-api/agents_api/rec_sum/utils.py
+++ b/agents-api/agents_api/rec_sum/utils.py
@@ -2,36 +2,53 @@
## Utils ##
###########
+
class chatml:
- make = lambda content, role="system", name=None, **_: {
- key: value
- for key, value in dict(role=role, name=name, content=content).items()
- if value is not None
- }
-
- user = lambda content, name=None: chatml.make(role="user", content=content, name=name)
- assistant = lambda content, name=None: chatml.make(
- role="assistant", content=content, name=name
- )
- system = lambda content, name=None: chatml.make(content, name=name)
- thought = lambda content, name=None: chatml.make(content, name="thought")
- information = lambda content: chatml.system(content, name="information")
- summary = lambda content: chatml.system(content, name="summary")
- entities = lambda content: chatml.system(content, name="entity")
+ @staticmethod
+ def make(content, role="system", name=None, **_):
+ return {
+ key: value
+ for key, value in dict(role=role, name=name, content=content).items()
+ if value is not None
+ }
+
+ @staticmethod
+ def user(content, name=None):
+ return chatml.make(role="user", content=content, name=name)
+
+ @staticmethod
+ def assistant(content, name=None):
+ return chatml.make(role="assistant", content=content, name=name)
+ @staticmethod
+ def system(content, name=None):
+ return chatml.make(content, name=name)
+ @staticmethod
+ def thought(content, name=None):
+ return chatml.make(content, name="thought")
-add_indices = lambda list_of_dicts, idx_name="index": [
- {idx_name: i, **msg} for i, msg in enumerate(list_of_dicts)
-]
+ @staticmethod
+ def information(content):
+ return chatml.system(content, name="information")
+ @staticmethod
+ def summary(content):
+ return chatml.system(content, name="summary")
+ @staticmethod
+ def entities(content):
+ return chatml.system(content, name="entity")
-get_names_from_session = lambda session: {
- role: next((
- msg.get("name", None)
- for msg in session
- if msg["role"] == role
- ), None)
- for role in {"user", "assistant", "system"}
-}
+
+def add_indices(list_of_dicts, idx_name="index"):
+ return [{idx_name: i, **msg} for i, msg in enumerate(list_of_dicts)]
+
+
+def get_names_from_session(session):
+ return {
+ role: next(
+ (msg.get("name", None) for msg in session if msg["role"] == role), None
+ )
+ for role in {"user", "assistant", "system"}
+ }