diff --git a/README.md b/README.md index c01fe3b..a5fb97c 100644 --- a/README.md +++ b/README.md @@ -3,18 +3,24 @@ Sean Browning (NCEZID/OD Data Science Team) ## Background -I wanted to learn how to create and use language agents to solve complex problems. Langchain wasn't cutting it for me, so I made my own library from first principles. -Right now, everything is quite experimental and I'm still figuring out the API. +I wanted to learn how to create and use language agents to solve complex problems. LangChain wasn't cutting it for me, so I made my own library from first principles to suit a few projects we're working on. + +This package contains a few classes that can be used as building blocks for language agents and agentic systems. I plan to expand it with additional functionality as I need it, but keep a minimal footprint (ie. if you're already using openai and pydantic, this should bring no additional dependencies). + +All code uses asyncio by design, and though I've tried to generalize as I can, I mostly built around OpenAI and specifically Azure OpenAI since that's what we are allowed to work with internally. ## Installation -Should be pip installable from this repo. +Not currently on pypi, so just use pip to install via GitHub: ```sh -pip install git+https://github.com/cdcai/agents.git +pip install git+https://github.com/cdcai/multiagent.git ``` ## Examples -#TODO - +| Example | Link | +| ---- | ---- | +| Taking output from one agent as input to another in a callback | [agent_with_callback.py](examples/agent_with_callback.py) | +| Getting structured output from agent / Text Prediction | [structured_prediction.py](examples/structured_prediction.py) | +| Batch processing large inputs over the same agent in parallel | [batch_processing.py](examples/batch_processing.py) | diff --git a/agents/__init__.py b/agents/__init__.py index 9ad3c4e..86917f8 100644 --- a/agents/__init__.py +++ b/agents/__init__.py @@ -1,3 +1,5 @@ from .agent import * -from .env import WikiQAEnv from .generic import * +from .processors import BatchProcessor, DFBatchProcessor +from .callbacks import * +from .stopping_conditions import * \ No newline at end of file diff --git a/agents/abstract.py b/agents/abstract.py new file mode 100644 index 0000000..80836cd --- /dev/null +++ b/agents/abstract.py @@ -0,0 +1,113 @@ +""" +All abstract classes +""" +import abc +import logging +import os +from typing import Callable, List, Optional, Union, Any + +import openai +from openai.types.chat.chat_completion import Choice, ChatCompletionMessage + +logger = logging.getLogger(__name__) + +class _StoppingCondition(metaclass=abc.ABCMeta): + """ + A callable that contains some logic to determine whether a language agent + has finished it's run. + + The main call should always return the final answer, if we've finished the run, or None otherwise + """ + @abc.abstractmethod + def __call__(self, cls: '_Agent', response: str) -> Optional[Any]: + raise NotImplementedError() + +class _Agent(metaclass=abc.ABCMeta): + terminated: bool = False + truncated: bool = False + curr_step: int = 1 + output_len: int = 1 + scratchpad: str = "" + answer: Any = "" + BASE_PROMPT: str = "" + SYSTEM_PROMPT: str = "" + oai_kwargs : dict + TOOLS : list + CALLBACKS : list + callback_output: list + tool_res_payload: list[dict] + + def __init__( + self, + model_name: str, + stopping_condition: _StoppingCondition, + llm: Optional[openai.AsyncOpenAI] = None, + tools: Optional[List[dict]] = None, + callbacks: Optional[List[Callable]] = None, + oai_kwargs: Optional[dict[str, Any]] = None, + **fmt_kwargs + ): + pass + + @abc.abstractmethod + async def prompt_agent(self, prompt: Union[dict[str, str], list[dict[str, str]]], n_tok: Optional[int] = None, **addn_oai_kwargs) -> Choice: + raise NotImplementedError() + + @abc.abstractmethod + async def step(self): + """ + Run a single "step" of the agent logic. + Handles prompting OpenAI, optionally handling tool calls, and determining whether we've + finished or run out of tokens. + """ + raise NotImplementedError() + + @abc.abstractmethod + def _check_stop_condition(self, response: ChatCompletionMessage) -> None: + """ + Called from within :func:`step()`. + Checks whether our stop condition has been met and handles assignment of answer, if so. + + It's broken out this way because we may not always want to use message.content as the answer (tool call output, for instance) + """ + + @abc.abstractmethod + def format_prompt(self) -> str: + """ + Method which formats the BASE_PROMPT string, possibly inserting additional content. + This is usually called within get_next_message() to populate the first user message. + """ + raise NotImplementedError() + + @abc.abstractmethod + def get_next_messages(self) -> list[dict[str, str]]: + raise NotImplementedError() + + @abc.abstractmethod + def _handle_tool_calls(self, response: Choice) -> None: + raise NotImplementedError() + + @property + def is_terminated(self) -> bool: + return self.terminated + + @property + def _known_tools(self) -> list[str]: + return [tool["function"]["name"] for tool in self.TOOLS] + + @property + def is_truncated(self) -> bool: + return self.truncated + + @abc.abstractmethod + def dump(self, outfile: Union[str, os.PathLike]) -> None: + raise NotImplementedError() + + @staticmethod + @abc.abstractmethod + def authenticate() -> None: + raise NotImplementedError() + + @abc.abstractmethod + def reset(self): + raise NotImplementedError() \ No newline at end of file diff --git a/agents/agent/__init__.py b/agents/agent/__init__.py index cf890ac..b135222 100644 --- a/agents/agent/__init__.py +++ b/agents/agent/__init__.py @@ -1,6 +1,7 @@ from .base import * -from .sas import * -from .reflexion import * from .prediction import * -__all__ = ["ReactAgent", "ReactandReflectAgent", "SASConvertAgent", "PythonRefineAgent", "CodeOutlinerAgent", "OutlineSummarizeAgent", "PythonSummarizeAgent", "PseudocodeSummarizeAgent", "PredictionAgent"] \ No newline at end of file +__all__ = [ + "Agent", "StructuredOutputAgent", + "PredictionAgent", "PredictionAgentWithJustification" +] \ No newline at end of file diff --git a/agents/agent/base.py b/agents/agent/base.py index 3d2052f..fc049a1 100644 --- a/agents/agent/base.py +++ b/agents/agent/base.py @@ -1,59 +1,107 @@ -import abc import json import logging import os import re -import subprocess -import sys -import tempfile -from typing import Callable, Literal, Optional, Union -from copy import deepcopy +from copy import deepcopy, copy +from typing import Any, Callable, Optional import backoff -import gymnasium as gym import openai -import tiktoken from azure.identity import ClientSecretCredential -from openai.types.chat.chat_completion import Choice, ChatCompletionMessage +from openai.types.chat.chat_completion import ChatCompletionMessage +from pydantic import BaseModel + +from ..abstract import _Agent +from ..decorators import response_model_handler +from ..stopping_conditions import StopOnDataModel logger = logging.getLogger(__name__) -class Agent(metaclass=abc.ABCMeta): - terminated: bool = False - truncated: bool = False - curr_step: int = 1 - scratchpad: str = "" - answer: str = "" - BASE_PROMPT: str = "" - SYSTEM_PROMPT: str = "" - oai_kwargs : dict = { - "temperature": 0.0 - } - def __init__( - self, question: str, model_name: str, llm: Optional[openai.OpenAI] = None, **oai_kwargs - ): - self.question = question +class Agent(_Agent): + """ + Base Class for language agents, which can be initialized directly or subclassed depending on use case. + + :param bool terminated: Whether agent has completed it's run (use :func:`reset`, to reset) + :param bool truncated: Whether the agent received a truncated response + :param int curr_step: Current step of the query process + :param int output_len: Expected length of answer `("", ) * output_len` + :param str scratchpad: Documents all steps querying and evaluating LLM responses + :param any answer: Final response from agent (default: str, could be any) + :param str BASE_PROMPT: Query prompt which should be populated with `fmt_kwargs` via fstr (and possibly other parameters via :func:`format_prompt`). sent as system message + :param str SYSTEM_PROMPT: System prompt (persona) sent as first message in chat session + :param dict oai_kwargs: OpenAI arguments passed as-is to API (temperature, top_p, etc.) + :param list TOOLS: List of tools the agent can use. Can be defined in subclass or at runtime. (see: https://platform.openai.com/docs/guides/function-calling) + :param list CALLBACKS: List of callbacks to evaluate at completion. Should be a list of callables with a signature `fun(self, answer, scratchpad)` + :param _StoppingCondition stopping_condition: The StoppingCondition handler class which will be called after each step to determine if the task is completed. + + + Tool Use + -------- + Each added tool must have a corresponding class method that can be invoked during :func:`step()` if the GPT calls it. + You should subclass accordingly. + + Callback Use + ------------ + Each callback will have access to class object, scratchpad, and final answer, thus the signature must match. + This is still quite experimental, but the intended usecase is for reflection / refinement applications. + """ + + def __init__(self, model_name, stopping_condition, llm = None, tools = None, callbacks = None, oai_kwargs = None, **fmt_kwargs): + """ + Base Agent class + + :param str model_name: Name of OpenAI model to use (or deployment name for AzureOpenAI) + :param _StoppingCondition stopping_condition: A handler that signals when an Agent has completed the task + :param AsyncOpenAI llm: Instantiated OpenAI instance to use (optional) + :param List[dict] tools: List of tools the agent can call via response (optional) + :param List[Callable] callbacks: List of callbacks to evaluate at end of run (optional) + :param dict[str, any] oai_kwargs: Dict of additional OpenAI arguments to pass thru to chat call + :param fmt_kwargs: Additional named arguments which will be inserted into the :func:`BASE_PROMPT` via fstring + """ + self.fmt_kwargs = fmt_kwargs + self.stopping_condition = stopping_condition # We default to Azure OpenAI here, but # we could also use something else as long as it follows the OpenAI API if llm is None: self.authenticate() - self.llm = openai.AzureOpenAI() + self.llm = openai.AsyncAzureOpenAI() else: self.llm = llm self.model_name = model_name - self.oai_kwargs.update(oai_kwargs) + + # Handle Tools + self.TOOLS = getattr(self, "TOOLS", []) + + if tools is not None: + self.TOOLS.extend(tools) + + # Handle Callbacks + self.CALLBACKS = getattr(self, "CALLBACKS", []) + + if callbacks is not None: + self.CALLBACKS.extend(callbacks) + + self.oai_kwargs = oai_kwargs if oai_kwargs is not None else {} + + if len(self.TOOLS): + self.oai_kwargs.update({"tools": self.TOOLS}) + self.reset() - def run(self, reset: bool = False) -> None: + async def run(self, reset: bool = False, *kwargs) -> None: if reset: self.reset() - while not (self.is_terminated() or self.is_truncated()): + while not (self.is_terminated or self.is_truncated): logger.debug(f"Running step {self.curr_step}.") - self.step() + await self.step() - def __call__(self, *args, **kwargs) -> str: + # Evaluate callbacks, if available + for callback in self.CALLBACKS: + await callback(self, answer=self.answer, scratchpad=self.scratchpad) + + async def __call__(self, *args, **kwargs) -> str: """ Run the underlying agent logic and returns the final answer. @@ -62,64 +110,123 @@ def __call__(self, *args, **kwargs) -> str: """ outfile = kwargs.pop("outfile", None) - self.run(*args, **kwargs) + await self.run(*args, **kwargs) if outfile is not None: self.dump(outfile) return self.answer - @abc.abstractmethod - def step(self): - raise NotImplementedError() - @backoff.on_exception(backoff.expo, (openai.APIError, openai.AuthenticationError), max_tries=3) - async def aprompt_agent(self, prompt: Union[dict[str, str], list[dict[str, str]]], n_tok: Optional[int] = None, **addn_oai_kwargs) -> Choice: + def _check_stop_condition(self, response): + # Check if we've reached a stopping place + if (answer := self.stopping_condition(self, response)) is not None: + self.answer = answer + self.terminated = True + logger.info("Stopping condition signaled, terminating.") + + async def step(self): """ - An async version of the main OAI prompting logic. + Run a single "step" of the agent logic. + Handles prompting OpenAI, optionally handling tool calls, and determining whether we've + finished or run out of tokens. + """ + # Pull base query + system messages + # (abstract) + llm_prompt_input = self.get_next_messages() - :param prompt: Either a dict or a list of dicts representing the message(s) to send to OAI model - :param n_tok: An optional maximum token length to request of the model response - :param addn_oai_kwargs: Key word arguments passed to completions.create() call (tool calls, etc.) + # Send off messages for reply + self.scratchpad += f"=== Step {self.curr_step} ===========\n" + + # Attempt to query GPT and handle invalid JSON parsing of args + response = None + n_retry = 3 + while response is None and n_retry > 0: + try: + response = await self.prompt_agent(llm_prompt_input) + except json.decoder.JSONDecodeError as e: + if n_retry == 0: + raise e + else: + self.scratchpad += "JSONDecodeError in tool call argumement parsing. Retrying.\n" + logger.warning(f"Tool calls in response couldn't be decoded. {n_retry} retries remaining.") + llm_prompt_input.append( + { + "role": "user", + "content": "The arguments to your previous tool call couldn't be parsed correctly. Please ensure you properly escapse quotes and construct a valid JSON payload." + } + ) + n_retry -= 1 + continue + except AssertionError as e: + # Agent tries to apply unknown function + if n_retry == 0: + raise e + else: + self.scratchpad += "Attempted to apply non-function, retrying.\n" + logger.warning(f"Agent attempted to apply undefined function. {n_retry} retries remaining.") + llm_prompt_input.append( + { + "role": "system", + "content": f"You attempted to apply an undefined function, you may only use the following functions as tool calls: {self._known_tools}." + } + ) + n_retry -= 1 + continue + if response is None: + logger.warning("No response after 3 retries, Terminating!") + self.truncated = True + else: + if response.finish_reason == "length": + # Determine if we're truncated + self.truncated = True + logger.warning("Response truncated due to length, Terminating!") + # Recursive call if tool calls in response + elif response.finish_reason == "tool_calls": + self._handle_tool_calls(response) + + # Conditionally end run and assign answer + self._check_stop_condition(response) - :return: An openAI Choice response object + if self.terminated: + self.scratchpad += "===== Answer ============\n" + self.scratchpad += str(self.answer) + + # End Step + self.scratchpad += "==============================\n\n" + self.curr_step += 1 + + @staticmethod + def clean_response(res: str) -> str: + """ + Simple helper function to clean response text, if desired """ + out = res.strip('\n').strip().replace('\n', '') + return out - # Prompts should be passed as a list, so handle - # the case where we just passed a single dict - if isinstance(prompt, dict): - prompt = [prompt] + @staticmethod + def authenticate(): + """ + Authenticate against Azure OpenAI using Service Principal + + HACK: Obviously this assumes we use AzureOpenAI, so we could modularize this + """ + # === Service Principal auth ======================== + credential = ClientSecretCredential( + tenant_id=os.environ["SP_TENANT_ID"], + client_id=os.environ["SP_CLIENT_ID"], + client_secret=os.environ["SP_CLIENT_SECRET"], + ) - try: - res = await self.llm.chat.completions.create( - messages=prompt, model=self.model_name, max_tokens=n_tok, - **addn_oai_kwargs, - **self.oai_kwargs - ) - except openai.AuthenticationError: - logger.info("Auth failed, attempting to re-authenticate before retrying") - # HACK: This isn't terrific, but it should work for - # our current use case (Azure OpenAI with service principal/User creds) - if isinstance(self.llm, openai.AzureOpenAI): - self.authenticate() - self.llm.api_key = os.environ["AZURE_OPENAI_API_KEY"] - raise e - except Exception as e: - # TODO: some error handling here - logger.debug(e) - raise e + os.environ["AZURE_OPENAI_API_KEY"] = credential.get_token( + "https://cognitiveservices.azure.com/.default" + ).token - out = res.choices[0] - logger.info(f"Received response: {out.message.content}") + os.environ["AZURE_OPENAI_ENDPOINT"] = os.environ["GPT4_URL"] - if out.finish_reason == "length": - self.truncated = True - logger.warn("Message returned truncated.") - return out - @backoff.on_exception(backoff.expo, (openai.APIError, openai.AuthenticationError), max_tries=3) - def prompt_agent(self, prompt: Union[dict[str, str], list[dict[str, str]]], n_tok: Optional[int] = None, **addn_oai_kwargs) -> Choice: + async def prompt_agent(self, prompt, n_tok = None, **addn_oai_kwargs): """ - The main OAI prompting logic. + An async version of the main OAI prompting logic. :param prompt: Either a dict or a list of dicts representing the message(s) to send to OAI model :param n_tok: An optional maximum token length to request of the model response @@ -133,8 +240,12 @@ def prompt_agent(self, prompt: Union[dict[str, str], list[dict[str, str]]], n_to if isinstance(prompt, dict): prompt = [prompt] + self.scratchpad += f"--- Input ---------------------------\n" + self.scratchpad += "\n".join(msg["content"] for msg in prompt if not isinstance(msg, ChatCompletionMessage)) + self.scratchpad += "\n-----------------------------------\n" + try: - res = self.llm.chat.completions.create( + res = await self.llm.chat.completions.create( messages=prompt, model=self.model_name, max_tokens=n_tok, **addn_oai_kwargs, **self.oai_kwargs @@ -153,20 +264,55 @@ def prompt_agent(self, prompt: Union[dict[str, str], list[dict[str, str]]], n_to raise e out = res.choices[0] + + self.scratchpad += "--- Output --------------------------\n" + self.scratchpad += "Message:\n" + self.scratchpad += out.message.content if out.message.content else "" + "\n" + + if len(self.TOOLS): + # attempt to parse tool call arguments + if out.finish_reason == "tool_calls": + # Append GPT response to next payload + # NOTE: This has to come before the next step of parsing + self.tool_res_payload.append(deepcopy(out.message)) + + self.scratchpad += "Tool calls: \n" + for i, tool in enumerate(out.message.tool_calls): + try: + assert tool.function.name in self._known_tools + except Exception as e: + self.tool_res_payload.pop() + raise e + + out.message.tool_calls[i].function.arguments = json.loads(tool.function.arguments) + + # Log it + toolcall_str = f"{tool.function.name}({str(tool.function.arguments)[:50] + '...(trunc)' if len(str(tool.function.arguments)) > 50 else str(tool.function.arguments)})" + logger.info(f"Got toolcall: {toolcall_str}") + self.scratchpad += f"\t=> {toolcall_str}\n" + + self.scratchpad += "\n-----------------------------------\n" logger.info(f"Received response: {out.message.content}") if out.finish_reason == "length": self.truncated = True - logger.warn("Message returned truncated.") + self.scratchpad += "Response returned truncated from OpenAI due to token length.\n" + logger.warning("Message returned truncated.") return out - @abc.abstractmethod - def format_prompt(self, **kwargs) -> str: + def format_prompt(self) -> str: """ - Method which formats the BASE_QUERY string, possibly inserting additional content. + Method which formats the BASE_PROMPT string, possibly inserting additional content. This is usually called within get_next_message() to populate the first user message. """ - raise NotImplementedError() + if len(self.BASE_PROMPT) == 0: + raise ValueError("You initialized an Agent with not BASE_PROMPT, please define this attribute with your prompt, optionally adding any formatting args in brackets.") + try: + out = self.BASE_PROMPT.format(**self.fmt_kwargs) + except KeyError as err: + raise KeyError(f"The following format kwargs were not passed at init time to format the BASE_PROMPT: {err}.") + + return out def get_next_messages(self) -> list[dict[str, str]]: """ @@ -175,331 +321,16 @@ def get_next_messages(self) -> list[dict[str, str]]: """ out = [ {"role": "system", "content": self.SYSTEM_PROMPT}, - {"role": "user", "content": self.format_prompt()} + {"role": "system", "content": self.format_prompt()} ] - - return out - def is_terminated(self) -> bool: - return self.terminated - - def is_truncated(self) -> bool: - return self.truncated - - def reset(self) -> None: - """ - Reset agent state for a re-run - """ - self.scratchpad = "" - self.answer = "" - self.curr_step = 1 - self.truncated = False - self.terminated = False - - def dump(self, outfile: Union[str, os.PathLike]) -> None: - """ - Dump scratchfile to disk - """ - with open(outfile, "w", encoding="utf-8") as file: - file.writelines(elem + "\n" for elem in self.scratchpad.split("\n")) - - @staticmethod - def clean_response(res: str) -> str: - out = res.strip('\n').strip().replace('\n', '') - return out - - @staticmethod - def authenticate() -> None: - """ - Authenticate against Azure OpenAI using Service Principal - """ - # === Service Principal auth ======================== - credential = ClientSecretCredential( - tenant_id=os.environ["SP_TENANT_ID"], - client_id=os.environ["SP_CLIENT_ID"], - client_secret=os.environ["SP_CLIENT_SECRET"], - ) - - os.environ["AZURE_OPENAI_API_KEY"] = credential.get_token( - "https://cognitiveservices.azure.com/.default" - ).token - - os.environ["AZURE_OPENAI_ENDPOINT"] = os.environ["GPT4_URL"] - -class PersistentAgent(Agent): - APPEND_PROMPT : str = "{obs}" - conversation_cache : list[dict] - - def reset(self) -> None: - self.conversation_cache = [] - return super().reset() - - def step(self): - """ - Full Agent logic. Prompts LLM and saves answer - """ - llm_prompt_input = self.get_next_messages() - response = self.prompt_agent(llm_prompt_input, n_tok=None) - answer = response.message.content - self.scratchpad += f"=== Input ==========\n" - self.scratchpad += "\n".join(msg["content"] for msg in llm_prompt_input) - self.scratchpad += "\n===================================\n" - self.scratchpad += f"\n=== Answer =====\n" - self.scratchpad += "\n".join(answer) + "\n" - self.scratchpad += "\n===================================\n" - - self.answer = answer - self.conversation_cache.append({k: response.message.__dict__[k] for k in ["role", "content"]}) - self.terminated = True - - def format_append_prompt(self, obs: str) -> str: - return self.APPEND_PROMPT.format(obs=obs) - - def add_observation(self, obs: str) -> None: - """ - Append new message / observation to message cache - This inserts `obs` the `APPEND_PROMPT` attribute and appends to the conversation - """ - self.conversation_cache.append({ - "role": "user", - "content": self.format_append_prompt(obs) - }) - - def get_next_messages(self) -> list[dict[str, str]]: - out = super().get_next_messages() - out.extend(self.conversation_cache) - - return(out) - -class ReduceAgent(Agent): - """ - An agent which reduces a list[str] question to a single string output - """ - question : list[str] - - def __init__(self, question: list[str], model_name: str, llm: openai.OpenAI | None = None, **oai_kwargs): - self.question = [] - super().__init__(question, model_name, llm, **oai_kwargs) - def step(self): - """ - This will always be a single-step run to summarize the messages - """ - res = self.prompt_agent(self.get_next_messages(), n_tok=None) - self.answer = res.message.content - self.terminated = True - - def get_next_messages(self) -> list[dict[str, str]]: - out = super().get_next_messages() - out.append({ - "role": "assistant", - "content": "\n".join(self.question) - }) - - return out - def format_prompt(self, **kwargs) -> str: - """Return BASE_PROMPT as-is, no templating""" - return self.BASE_PROMPT - -class ChunkedAgent(Agent): - """ - A language agent which can handle large prompt input by chunking. - It will use tiktoken to estimate token usage and ensure that the message payload - sent each trip is below `chunk_max`, set at init time. - - Each subsequent step() will append the output from the previous run as context. - - After the run is finished, all answers will be appended together into a single string and assigned to the answer attribute. - """ - answer_cache : list[str] = [] - prompt_len : int - full_question : str - chunk_max : int - - def __init__(self, question: str, model_name: str, llm: openai.OpenAI | None = None, chunk_max : int = 3000, **oai_kwargs): - # Also save full input to full_question attribute since we'll - # overwrite self.question if the resulting payload is too large - self.full_question = question - self.chunk_max = chunk_max - # Get tokenizer to handle chunking responses if needed - try: - self.tokenizer = tiktoken.encoding_for_model(model_name) - except: - self.tokenizer = tiktoken.get_encoding("cl100k_base") - - # Take the base prompt length before we fstring it - self.prompt_len = len(self.tokenizer.encode(self.BASE_PROMPT.format(question="") + self.SYSTEM_PROMPT)) - - super().__init__(question, model_name, llm, **oai_kwargs) - - def combine_answer_cache(self) -> None: - """ - Combine possibly >1 response into a the final answer string - """ - self.answer = "\n".join(self.answer_cache) - - def fetch_last_response(self) -> Optional[dict[str, str]]: - """ - If step > 1, returns last response to use as context for the next. - Otherwise, we'd only include the system query and base query (including possibly only a single chunk of the input) - """ - if len(self.answer_cache): - out = { - "role": "user", - "content": "Here is the output of previous chunk you worked on, for context:\n {}".format(self.answer_cache[-1]) - } - else: - out = None - - return out - - def get_next_messages(self) -> list[dict[str, str]]: - """ - Retrieve next message payload for GPT prompting, and append previous output for context, if needed. - """ - out = super().get_next_messages() - if (last_translation_message := self.fetch_last_response()) is not None: - out.append(last_translation_message) - - return out - def get_prompt_len(self) -> int: - """ - Return base prompt length before using fstring to fill in template. - (also accounting for possible previous translation context) - """ - if (last_translation_message := self.fetch_last_response()) is not None: - prompt_len = self.prompt_len + len(self.tokenizer.encode(last_translation_message["content"])) - else: - prompt_len = self.prompt_len - - return prompt_len - - def format_prompt(self, split_expr: str = "\n{2,}?", join_str: str = "\n\n", **kwargs) -> str: - """ - Formatting BASE_QUERY, checking for output length and chunking self.question if necessary - - :param split_expr (str): A string or regex to pass to re.split() to split self.question into chunks. - :param join_str (str): A string to use to recompose chunks of self.question back together. - - NOTE: split_expr and join_str can be different (ex. '\\n{2, }?', and '\\n\\n'), - but join_str should always produce output that could be split on subsequent calls using split_expr. - """ - prompt_len = self.get_prompt_len() - - input_chunks = re.split(split_expr, self.question) - excess = [] - - # pop chunk by chunk until we have a message payload less than the requested max - while len(self.tokenizer.encode(join_str.join(input_chunks))) + prompt_len > self.chunk_max: - # Store excess message payload in question object - excess.append(input_chunks.pop()) - - # Reverse things around and re-join to string - # to get things the right way around - self.question = join_str.join(reversed(excess)) - - return re.sub("\s+", " ", self.BASE_PROMPT).format(question=join_str.join(input_chunks)).strip() - - def step(self): - # Prompt LLM - llm_prompt_input = self.get_next_messages() - answer = self.prompt_agent(llm_prompt_input, n_tok = 2 * self.chunk_max).message.content - self.scratchpad += f"=== Input {self.curr_step} ==========\n" - self.scratchpad += "\n".join(msg["content"] for msg in llm_prompt_input) - self.scratchpad += "\n===================================\n" - self.scratchpad += f"\n=== Answer {self.curr_step} =====\n" - self.scratchpad += answer + "\n" - self.scratchpad += "\n===================================\n" - - # Append answer to cache and continue - self.answer_cache.append(answer) - - # End run - self.terminated = len(self.question) == 0 - self.curr_step += 1 - - def run(self, reset: bool = False) -> None: - super().run(reset) - self.combine_answer_cache() - - def reset(self) -> None: - self.answer_cache = [] - return super().reset() - -class ToolAwareAgent(Agent): - """ - A base-class for an agent which can utilize OpenAI tool calls. - Subclasses would be expected to extend the TOOLS attribute to include additional - tools / functions. Each added tool should be appended to the base attribute at init. - - In addition, each added tool must have a corresponding class method that can be invoked - during step() if the GPT calls it. - """ - - TOOLS : list[dict] - # Will always be added to TOOLS - # (required to finalize) - submit_tool : dict = { - # Submit (final response) - "type": "function", - "function": { - "name": "call_submit", - "description": "Submit the final response back to user", - "parameters": { - "type": "object", - "properties": { - "input": { - "type": "string", - "description": "Final response to user" - } - }, - "required": ["input"] - } - } - } - - # Payload to send back in subsequent steps - tool_res_payload : list[dict] - - def __init__(self, question: str, model_name: str, llm: openai.OpenAI | None = None, tools: Optional[Union[dict, list[dict]]] = None, submit_tool: bool = True, **oai_kwargs): - self.TOOLS = [] - self.tool_res_payload = [] - if tools is not None: - if isinstance(tools, list): - self.TOOLS.extend(tools) - else: - self.TOOLS.append(tools) - if submit_tool: - self.TOOLS.extend(self.submit_tool) - - super().__init__(question, model_name, llm, **oai_kwargs) - - def prompt_agent(self, prompt: Union[dict[str, str], list[dict[str, str]]], n_tok: Optional[int] = None, tool_use : Literal["required", "auto", "none"] = "auto"): - - out = super().prompt_agent(prompt, n_tok, tools=self.TOOLS, tool_choice=tool_use) - - if out is not None: - # Append GPT response to next payload - # NOTE: This has to come before the next step of parsing - self.tool_res_payload.append(deepcopy(out.message)) - - # attempt to parse tool call arguments - if out.finish_reason == "tool_calls": - for i, tool in enumerate(out.message.tool_calls): - out.message.tool_calls[i].function.arguments = json.loads(tool.function.arguments) - - return out - def get_next_messages(self) -> list[dict[str, str]]: - """ - Get next message payload for GPT, possibly appending tool call result output, if present. - """ - out = super().get_next_messages() # If we have existing tool response messages, append them if len(self.tool_res_payload): out.extend(self.tool_res_payload) return out - - def _handle_tool_calls(self, response: Choice): + + def _handle_tool_calls(self, response): """ Handle all tool calls in response object @@ -508,16 +339,18 @@ def _handle_tool_calls(self, response: Choice): The output of that method is appended to a new message in the tool_res_payload list, for downstream querying. """ for tool in response.message.tool_calls: + self.scratchpad += "--- Evaluating Toolcalls -----------------\n" # Try to call tool, if present, else raise. try: fun : Callable = getattr(self, tool.function.name) # OpenAI returns as str, which should hopefully eval to dict kwargs : dict[str, any] = tool.function.arguments - self.scratchpad += f"=> Requested toolcall: {tool.function.name}({str(kwargs)[:30] + '...'} <=\n" - logger.info(f"Got tool call: {tool.function.name}({str(kwargs)[:30] + '...'})") - tool_result = fun(**kwargs) + + self.scratchpad += f"\t=> {tool.function.name}()\n" + self.scratchpad += tool_result if type(tool_result) == str else repr(tool_result) + "\n\n" + self.tool_res_payload.append( { "tool_call_id": tool.id, @@ -526,126 +359,88 @@ def _handle_tool_calls(self, response: Choice): } ) except Exception as e: - logger.error(f"Tool call {tool} failed.") + logger.error(f"Tool call {tool.function.name} failed.") raise e - def step(self): - # Pull base query + system messages - # (abstract) - llm_prompt_input = self.get_next_messages() + self.scratchpad += "---------------------------------\n\n" - # Send off messages for reply - self.scratchpad += f"=== Input {self.curr_step} ==========\n" - self.scratchpad += "\n".join(msg["content"] for msg in llm_prompt_input if not isinstance(msg, ChatCompletionMessage)) - self.scratchpad += "\n===================================\n" - - # Attempt to query GPT and handle invalid JSON parsing of args - response = None - n_retry = 3 - while response is None and n_retry > 0: - try: - response = self.prompt_agent(llm_prompt_input) - except json.decoder.JSONDecodeError as e: - if n_retry == 0: - raise e - else: - logger.warn(f"Tool calls in response couldn't be decoded. {n_retry} retries remaining.") - llm_prompt_input.append( - { - "role": "user", - "content": "The arguments to your previous tool call couldn't be parsed correctly. Please ensure you properly escapse quotes and construct a valid JSON payload." - } - ) - n_retry -= 1 - continue - if response is None: - logger.warning("No response after 3 retries, Terminating!") - self.truncated = True - else: - if response.finish_reason == "length": - # Determine if we're truncated - self.truncated = True - logger.warn("Response truncated due to length, Terminating!") - # Recursive call if tool calls in response - elif response.finish_reason == "tool_calls": - self._handle_tool_calls(response) - - # End Step - self.curr_step += 1 - - def call_submit(self, input: str, clean: bool = False) -> None: + def reset(self) -> None: """ - Final response call, which terminates further processing + Reset agent state for a re-run """ - out_msg = self.clean_response(input) if clean else input - logger.info(f"Received final response: {out_msg}") - self.scratchpad += "===== Answer ==========\n" - self.scratchpad += out_msg - self.answer = out_msg - self.terminated = True - - def reset(self) -> None: + self.scratchpad = "" + self.answer = "" self.tool_res_payload = [] - return super().reset() + self.callback_output = [] + self.curr_step = 1 + self.truncated = False + self.terminated = False - def format_prompt(self) -> str: - return re.sub("\s+", " ", self.BASE_PROMPT).format(question=self.question) - - @staticmethod - def _subprocess_tool_call_on_file(tool_input: str, cmd_args: list[str], output_type: Literal["stdout", "file"] = "stdout") -> str: + def dump(self, outfile): """ - A helper function that writes `tool_input` to a file and runs a python module on that file, either returning stdout+stderr or the contents of the file after the subprocess call. - - :param tool_input (str): A string to pass as input to the tool (this is likely code) - :param cmd_args (list[str]): Command-line args between the python -m call and the file name (should include the python module to call and any additional arguments) - :param output_type (str): The output to return (either stdout+error, or contents of the tempfile, if this is modified) - - :return: Either stdout and stderr concatenated into a string and separated by a newline, or `tool_input` after calling the python module + Dump scratchfile to disk """ - with tempfile.TemporaryFile("w", delete=False) as file: - file.write(tool_input) - file.close() - - # Run mypy in a subprocess and capture stderr and stdout - out = subprocess.run( - [sys.executable, "-m", *cmd_args, file.name], - capture_output=True - ) - if output_type == "stdout": - return "\n".join([out.stdout.decode("utf-8"), out.stderr.decode("utf-8")]) - elif output_type == "file": - with open(file.name, "r") as f: - out = f.read() - return(out) - else: - # Shouldn't be reachable - raise NotImplementedError() - -class EnvAgent(Agent): + with open(outfile, "w", encoding="utf-8") as file: + file.writelines(elem + "\n" for elem in self.scratchpad.split("\n")) + +class StructuredOutputAgent(Agent): """ - A Base (abstract) class for language agents which interact with an environment - (as implemented by gymnasium) + An Agent with accepts a pydantic BaseModel to use as a tool / validator for model output + + A class method is constructed at runtime along with a stopping condition which triggers when a `response_model` object is detected in the response. """ - correct: bool = False + answer: dict[str, Any] - def __init__(self, question: str, model_name: str, llm: openai.OpenAI, env: gym.Env): - self.env = env - super().__init__(question, model_name, llm) - - def is_truncated(self) -> bool: - # NOTE: I think they also checked that prompt length - # was under a certain value here, but that'd mean - # importing tiktoken and computing it each step - return super().truncated() and not self.correct + def __init__(self, response_model: type[BaseModel], model_name: str, expected_len: Optional[int] = None, stopping_condition=None, llm=None, tools=None, callbacks=None, oai_kwargs=None, **fmt_kwargs): + """ + Language Agent with structured output - def prompt_agent(self, prompt: dict[str, str] | list[dict[str, str]], n_tok: int = 100, **oai_kwargs) -> str: - out_msg = super().prompt_agent(prompt, n_tok, **oai_kwargs) + Handles creating a class method that is used to validate the tool call in the response body at runtime. + Also constructs it's own stopping condition which triggers when a `response_model` object is detected in the response (and is parsed correctly). - out = self.clean_response(out_msg.message.content) + "\n" + :param BaseModel response_model: A data model to use for structured output + :param str model_name: Name of OpenAI model to use (or deployment name for AzureOpenAI) + :param int expected_len: Optional length constraint on the response_model (OpenAI API doesn't allow maxItems parameter in schema so this is checked post-hoc) + :param _StoppingCondition stopping_condition: A handler that signals when an Agent has completed the task + :param AsyncOpenAI llm: Instantiated OpenAI instance to use (optional) + :param List[dict] tools: List of tools the agent can call via response (optional) + :param List[Callable] callbacks: List of callbacks to evaluate at end of run (optional) + :param dict[str, any] oai_kwargs: Dict of additional OpenAI arguments to pass thru to chat call + :param fmt_kwargs: Additional named arguments which will be inserted into the :func:`BASE_PROMPT` via fstring + + """ + self.response_model = response_model + self.output_len = len(self.response_model.model_fields) + + if stopping_condition is None: + stopping_condition = StopOnDataModel(response_model) + else: + logger.warning("StructuredOutputAgent assumes a `StopOnBaseModel` stopping condition, but you passed another at runtime which will take precedence. This may lead to errors.") - return out + oai_tool = openai.pydantic_function_tool(response_model) + + # Ensure we don't modify external tools + tools_internal = copy(tools) + + if tools_internal is not None: + tools_internal.append(oai_tool) + else: + tools_internal = [oai_tool] - def reset(self): - super().reset() - self.env.reset() - self.correct = False + # Assign a class method + fun_name = oai_tool["function"]["name"] + setattr(self, fun_name, response_model_handler(self.response_model)) + + super().__init__(model_name, stopping_condition, llm, tools_internal, callbacks, oai_kwargs, **fmt_kwargs) + + def reset(self) -> None: + """ + Reset agent state for a re-run + """ + self.scratchpad = "" + self.answer = {} + self.tool_res_payload = [] + self.callback_output = [] + self.curr_step = 1 + self.truncated = False + self.terminated = False \ No newline at end of file diff --git a/agents/agent/prediction.py b/agents/agent/prediction.py index c923be2..18cdbcc 100644 --- a/agents/agent/prediction.py +++ b/agents/agent/prediction.py @@ -1,71 +1,81 @@ """ -Structured prediction agent (predicting from a list of choices) +Structured prediction agents (predicting from a list of choices) Sean Browning (oet5) """ import logging -from typing import List, Literal, Optional +from typing import Callable, List, Literal, Optional, Any import openai import polars as pl import pydantic -from .base import ToolAwareAgent +from ..abstract import _StoppingCondition +from .base import Agent, StructuredOutputAgent logger = logging.getLogger(__name__) -class PredictionAgent(ToolAwareAgent): +class PredictionAgent(StructuredOutputAgent): """ - A language agent which returns a structured prediction + A language agent which returns a structured prediction given a polars DataFrame input. + + The result from the agent should be a `dict[str, list]` with one element, "labels", of `len(df.height)` """ - answer: list[str] def __init__( self, - prompt: str, - persona: str, - model_name: str, df: pl.DataFrame, labels: list[str], - llm: openai.OpenAI | None = None, - **oai_kwargs, + model_name: str, + stopping_condition: Optional[_StoppingCondition] = None, + llm: Optional[openai.AsyncOpenAI] = None, + tools: Optional[List[dict]] = None, + callbacks: Optional[List[Callable]] = None, + oai_kwargs: Optional[dict[str, Any]] = None, + **fmt_kwargs ): """ - A ToolAwareAgent where the output is a prediction for each row of `df` from one of the choices in `labels` - - :param prompt: A prompt which becomes the first user message and should explain how to classify `df` - :param persona: A system message / persona - :param model_name: The Azure OpenAI model name to use - :param df: The data to classify - :param labels: The set of labels the model will choose from - :param llm: (optional) An OpenAI instance, otherwise one will be created via ClientSecret credentials internally - :param oai_kwargs: Additional key word args to pass to the OpenAI Chat Completion API (temperature, top_k, etc.) - + An Agent where the output is a prediction for each row of `df` from one of the choices in `labels` + + + :param df: The data to classify (the ndJSON format of this will be available as `df` to format the BASE_PROMPT) + :param labels: The set of categorical labels the model can choose from + :param str model_name: Name of OpenAI model to use (or deployment name for AzureOpenAI) + :param _StoppingCondition stopping_condition: A handler that signals when an Agent has completed the task (optional) + :param AsyncOpenAI llm: Instantiated OpenAI instance to use (optional) + :param List[dict] tools: List of tools the agent can call via response (optional) + :param List[Callable] callbacks: List of callbacks to evaluate at end of run (optional) + :param dict[str, any] oai_kwargs: Dict of additional OpenAI arguments to pass thru to chat call + :param fmt_kwargs: Additional named arguments which will be inserted into the :func:`BASE_PROMPT` via fstring """ self.labels = labels self.expected_n = df.height - self.df = df - self.SYSTEM_PROMPT = persona - self.BASE_PROMPT = prompt - self._build_pydantic_model() + response_model = self._build_pydantic_model() + super().__init__( - question=prompt, + response_model, model_name=model_name, + expected_len=self.expected_n, + stopping_condition=stopping_condition, llm=llm, - tools=self.response_tool, - submit_tool=False, - **oai_kwargs + tools=tools, + callbacks=callbacks, + oai_kwargs=oai_kwargs, + df=df.write_ndjson(), + **fmt_kwargs ) - def _build_pydantic_model(self): + def _build_pydantic_model(self) -> type[pydantic.BaseModel]: """ - Construct a pydantic model that we'll use to force the LLM to return a structured resposne + Construct a pydantic model that we'll use to force the LLM to return a structured response """ - self.response_model = pydantic.create_model( + response_model = pydantic.create_model( "classify", labels=( + # HACK: This is a bodge to build a model with labels only known at runtime + # but it will fail static typing in doing so List[Literal[tuple(self.labels)]], pydantic.Field( alias="labels", @@ -74,65 +84,41 @@ def _build_pydantic_model(self): ), ) - self.response_tool = openai.pydantic_function_tool( - self.response_model, - name="classify", - description="Classify the data using one of the possible categories", - ) + return response_model + + +class PredictionAgentWithJustification(PredictionAgent): + """ + A PredictionAgent which returns both a structured label output along with a short text justification for the prediction. - def format_prompt(self) -> str: + The label and justification are supplied in the same tool call / response body rather than in separate messages to improve coherence. + """ + def _build_pydantic_model(self): """ - Return our prompt as-is, because we pre-formatted it + Construct a pydantic model that we'll use to force the LLM to return a structured response. + This will also include a justification for the classification. """ - return ( - self.BASE_PROMPT - + f"\n{self.expected_n} potential cases:\n" - + self.df.write_ndjson() + + self.response_model = pydantic.create_model( + "classify", + labels=( + List[Literal[tuple(self.labels)]], + pydantic.Field( + alias="labels", + description="Classify the input data into one of the possible categories", + ) + ), + justification=( + List[str], + pydantic.Field( + alias="justification", + description="SHORT description explaining your reasoning for the classfication" + ) + ) ) - def classify(self, labels: list[str]) -> Optional[str]: - """ - Function to "classify" ADE, which inspects that the LLM provided a response of the correct length and used only the labels provided - """ - if len(labels) != self.expected_n: - logger.warning(f"Invalid return length: {len(labels)}, retrying.") - return f"Input was of length {self.expected_n} but you returned a response of length {len(labels)}. Please try again." - - try: - # End our run if we make it through this - parsed_args = self.response_model(labels=labels) - self.answer = parsed_args.labels - self.terminated = True - self.scratchpad += "===== Answer ==========\n" - self.scratchpad += str(self.answer) - logger.info("Got valid response, terminating.") - except pydantic.ValidationError: - logger.warning(f"Response didn't pass pydantic validation, retrying.") - # HACK: the default message from pydantic would be pretty long and lead to context length issues - # so I'm making my own - invalid_labels = [val for val in labels if val not in self.labels] - return f"Pydantic validation of function call failed because you passed the following invalid label(s): {invalid_labels}. Please retry using ONLY the labels allowed." - - def reset(self) -> None: - self.tool_res_payload = [] - self.scratchpad = "" - self.answer = [] - self.curr_step = 1 - self.truncated = False - self.terminated = False - - def run(self, reset: bool = False, steps: Optional[int] = None) -> None: - """ - Run the agent, optionally running only a fixed number of steps - """ - if reset: - self.reset() - - if steps is not None: - for _ in range(steps): - logger.debug(f"Running step {self.curr_step}.") - self.step() - if self.is_terminated() or self.is_truncated(): - break - else: - super().run(reset) \ No newline at end of file + self.response_tool = openai.pydantic_function_tool( + self.response_model, + name="classify", + description="Classify the data using one of the possible categories and, for each classification, provide a short description of your reasoning.", + ) \ No newline at end of file diff --git a/agents/agent/reflexion.py b/agents/agent/reflexion.py deleted file mode 100644 index 335acb8..0000000 --- a/agents/agent/reflexion.py +++ /dev/null @@ -1,197 +0,0 @@ -""" -Agents (React & Relexion) -(based from Reflexion repo, all credit to Noah Shinn and team) -https://github.com/noahshinn/reflexion -""" - -import logging -from typing import Literal - -import gymnasium as gym -import openai - -from .base import EnvAgent - -logger = logging.getLogger(__name__) - -class ReactAgent(EnvAgent): - BASE_PROMPT = """Solve a question answering task with interleaving Thought, Action, Observation steps. - Thought can reason about the current situation, and Action can be three types: - (1) Search[entity], which searches the exact entity on Wikipedia and returns the first paragraph if it exists. If not, it will return some similar entities to search. - (2) Lookup[keyword], which returns the next sentence containing keyword in the last passage successfully found by Search. - (3) Finish[answer], which returns the answer and finishes the task. - You may take as many steps as necessary, but only respond with the step requested at the end of this message. - Here are some examples: - {examples} - (END OF EXAMPLES) - - Question: {question}{scratchpad}""" - - def __init__( - self, - question: str, - examples: str, - model_name: str, - llm: openai.OpenAI, - env: gym.Env, - ): - super().__init__(question, model_name, llm, env) - self.examples = examples - - def step(self) -> None: - """ - Main Agent interaction logic, each step consists of three sub-steps: - 1. Think - 2. Act - 3. Observe - """ - - # Think - logger.info("thinking...") - self.scratchpad += f"\nThought {self.curr_step}: " - self.scratchpad += self.prompt_agent(self.format_prompt()) - - # Act - logger.info("getting action...") - self.scratchpad += f"\nAct {self.curr_step}: " - action = self.prompt_agent(self.get_next_messages()) - self.scratchpad += action - - # Observe - logger.info("executing action and recieving observation...") - self.scratchpad += f"\nObservation {self.curr_step}: " - obs, self.correct, self.terminated, self.truncated, self.curr_step = ( - self.env.step(action) - ) - self.scratchpad += obs + "\n" - - def format_prompt(self) -> str: - """ - Format the base prompt with dynamic content - using f-string plus kwargs - """ - return self.BASE_PROMPT.format( - examples=self.examples, question=self.question, scratchpad=self.scratchpad - ) - - -class ReactandReflectAgent(ReactAgent): - BASE_PROMPT = """Solve a question answering task with interleaving Thought, Action, Observation steps. Thought can reason about the current situation, and Action can be three types: -(1) Search[entity], which searches the exact entity on Wikipedia and returns the first paragraph if it exists. If not, it will return some similar entities to search. -(2) Lookup[keyword], which returns the next sentence containing keyword in the last passage successfully found by Search. -(3) Finish[answer], which returns the answer and finishes the task. -You may take as many steps as necessary. -Here are some examples: -{examples} -(END OF EXAMPLES) - -{reflections} - -Question: {question}{scratchpad}""" - REFELECTION_PROMPT = """You are an advanced reasoning agent that can improve based on self refection. You will be given a previous reasoning trial in which you were given access to an Docstore API environment and a question to answer. You were unsuccessful in answering the question either because you guessed the wrong answer with Finish[], or you used up your set number of reasoning steps. In a few sentences, Diagnose a possible reason for failure and devise a new, concise, high level plan that aims to mitigate the same failure. Use complete sentences. -Here are some examples: -{examples} - -Previous trial: -Question: {question}{scratchpad} - -Reflection:""" - FAILED_TRIAL_HEADER = ( - "You have attempted to answer the following question before and failed." - ) - REFLECTION_HEADER = "The following reflection(s) give a plan to avoid failing to answer the question in the same way you did previously. Use them to improve your strategy of correctly answering the given question." - LAST_TRIAL_HEADER = ( - "Below is the last trial where you attempted to answer the question." - ) - reflections: list[str] = [] - reflection_str: str = "" - - def __init__( - self, - question: str, - examples: str, - reflection_strategy: Literal[ - "last_attempt", "reflexion", "last_attempt_reflexion" - ], - model_name: str, - llm: openai.OpenAI, - env: gym.Env, - ): - super().__init__(question, examples, model_name, llm, env) - self.strategy = reflection_strategy - pass - - def run(self, reset: bool = False) -> None: - """ - Run standard React logic, but add in a reflection step if the agent failed previously - """ - if (self.is_terminated() or self.is_truncated()) and not self.env.is_correct(): - self.reflect() - - super().run(reset) - - def format_prompt(self) -> str: - """ - Format the base prompt with dynamic content - using f-string plus kwargs - """ - return self.BASE_PROMPT.format( - examples=self.examples, - question=self.question, - scratchpad=self.scratchpad, - reflections=self.reflection_str - ) - - def format_reflection_prompt(self) -> dict[str, str]: - """ - Format the reflection prompt with dynamic content - using f-string plus kwargs - """ - fmt_prompt = self.REFELECTION_PROMPT.format( - examples=self.examples, - question=self.question, - scratchpad=self.scratchpad - ) - - return {"role": "user", "content": fmt_prompt} - - def reflect(self) -> None: - """ - Reflect on failure to hopefully provide clues for - how to solve the problem in the next iteration - """ - - logger.debug("Reflecting.") - - self.reflection_str = self.FAILED_TRIAL_HEADER + "\n" - - if self.strategy == "last_attempt": - self.reflection_str += self.LAST_TRIAL_HEADER + "\n" - self.reflection_str += f"Question: {self.question}\n" - self.reflection_str += self.scratchpad - self.reflection_str += "(END PREVIOUS TRIAL)\n" - elif self.strategy == "reflexion": - self.reflections.append( - self.prompt_agent(self.format_reflection_prompt(), n_tok=250) - ) - self.reflection_str += self.REFLECTION_HEADER + "\n" - self.reflection_str += "\n- ".join(self.reflections) - elif self.strategy == "last_attempt_reflexion": - self.reflection_str += self.LAST_TRIAL_HEADER + "\n" - self.reflection_str += f"Question: {self.question}\n" - self.reflection_str += self.scratchpad - self.reflection_str += "(END PREVIOUS TRIAL)\n" - self.reflections.append( - self.prompt_agent(self.format_reflection_prompt(), n_tok=250) - ) - self.reflection_str += self.REFLECTION_HEADER + "\n" - self.reflection_str += "\n- ".join(self.reflections) - else: - raise NotImplementedError("Unknown Reflexion strategy: {self.strategy}") - - logger.debug(f"got reflection string:\n{self.reflection_str}") - - def reset(self) -> None: - self.reflections = [] - self.reflection_str = "" - super().reset() diff --git a/agents/agent/sas.py b/agents/agent/sas.py deleted file mode 100644 index 2d5e91e..0000000 --- a/agents/agent/sas.py +++ /dev/null @@ -1,300 +0,0 @@ -import logging -import re - -from openai import OpenAI - -from .base import PersistentAgent, ReduceAgent, ChunkedAgent, ToolAwareAgent - -logger = logging.getLogger(__name__) - -class OutlineSummarizeAgent(ReduceAgent): - """ - A simple agent that summarizes all the outlines provided by CodeOutlinerAgent - """ - SYSTEM_PROMPT : str = "You are an expert at creating highly technical and thorough summaries and outlines of computer programs" - BASE_PROMPT : str = """ - The following messages are all technical outlines another AI assistant produced pertaining to sections of the same computer program. - Please read and combine them to produce a detailed summary of the full program described in the same style. - You should retain all pertinent data so this outline can be used as a guide to write a program from scratch without seeing the underlying script. - """ - -class SAStoPythonAgent(PersistentAgent): - """ - A very simple, no frills, SAS -> Python code conversion agent. - """ - SYSTEM_PROMPT : str = "You are an expert at statistical and computer programming and can translate input SAS code to Python. You return only code as output." - BASE_PROMPT : str = "Convert the following SAS code into Python code. This is part of a larger script and you may only see on piece at a time. Attempt to convert all code, even if it's highly reliant on input data. You may use any library imports necessary to complete the task.\n```sas\n{question}\n```" - APPEND_PROMPT : str = "Refine your answer using the following reflections\n{obs}" - # Regex to extract python script from OpenAI response - # (allowing multiple cases because GPT-4 isn't consistent) - PYSCRIPT_CONTENT = re.compile(r"```[pP][ython]*\n(.+?)```", re.DOTALL) - - def step(self): - """ - Full Agent logic. Prompts LLM and saves answer - """ - super().step() - # Extract just the Python code from the response - self.answer = "\n".join(self.extract_pyscripts(self.answer)) - - def format_prompt(self) -> str: - return self.BASE_PROMPT.format(question=self.question).strip() - - def extract_pyscripts(self, answer: str) -> list[str]: - py_scripts = [script.group(1) for script in self.PYSCRIPT_CONTENT.finditer(answer)] - logger.info(f"Extracted {len(py_scripts)} python chunk from response.") - - return(py_scripts) - -class PythonReflexionAgent(PersistentAgent): - SYSTEM_PROMPT : str = "You are an advanced reasoning agent that can improve Python code based on self refection." - BASE_PROMPT : str = "A languge agent was given a task to translate an ETL/cleaning pipeline from SAS code into Python code. Review the code below and devise a high-level plan to improve the output script using best Python data science coding practices. Use complete sentences.\n{question}" - APPEND_PROMPT : str = "The script was modified according to your previous reflections. Review this new code and devise a new plan\n```python\n{obs}\n```" - - def format_prompt(self) -> str: - return self.BASE_PROMPT.format(question=self.question).strip() - -class PseudocodeSummarizeAgent(ReduceAgent): - """ - An agent which produces whole program pseudocode from several plain-language program outlines - """ - SYSTEM_PROMPT : str = "You are an expert in computer science and can produce legible pseudo-code from plain-text descriptions of computer programs" - BASE_PROMPT : str = """ - The following messages are all technical outlines another AI assistant produced pertaining to sections of the same computer program. - Please use these descriptions to re-create the full program being described using pseudo-code. You may generalize functions as necessary, but be sure that all funcationality makes it to the final product. - Your output should contain only psuedocode, no plaintext. - - ex. - - Input: [ - "The the program defines two integer variables, x = 2 and y = 3", - "The program defines a function, sum, that takes two integer parameters, a and b, and adds them together and returns an integer sum" - "The program runs the sum() function on x and y and assigns the value to variable z" - ] - - Output: ``` - let x : integer = 2; - let y : integer = 3; - def sum( x : integer, y : integer) -> integer { return (x + y) } - - z = sum(x,y) - ``` -""" - -class PythonSummarizeAgent(ReduceAgent): - """A simple agent that summarizes the several python scripts already generated""" - SYSTEM_PROMPT : str = "You are an expert python programmer that can reorganize and piece together a functional script from several fragments" - BASE_PROMPT : str = "A previous AI Agent translated a SAS script into Python in several chunks. Please combine all these chunks into a single valid python program. Please ensure that all the pieces are incorperated in the final script. Please return only the script as output in a ```python block" - - def get_next_messages(self) -> list[dict[str, str]]: - out = super().get_next_messages() - out.append({ - "role": "assistant", - "content": "\n".join(self.question) - }) - - return out -class CodeOutlinerAgent(ChunkedAgent): - """ - A language agent which, given input code, provides an outline of inputs, outputs, and transformations in plain language - for general understanding and downstream translation tasks - """ - - SYSTEM_PROMPT: str = "You are a coding expert capable of reading code in any langauge and providing coherent outlines of the underlying purpose and logic" - BASE_PROMPT: str = """ - Please outline the following code using bullets and plain language. You may only see part of the script at a time, so use any provided context as a guide to previously viewed code to incorperate into your response. - In your response, please provide a short description of what the program accomplishes, a list of inputs and outputs, and a step-by-step outline of any algorithms or transformations contained in the code. - Your response will be used downstream by other agents producing code in a different language, so please provide clear instructions that could be followed to produce the same script. - - ```sas - {question} - ``` - """ - answer : list[str] = [] - - def combine_answer_cache(self) -> None: - """ - Overload this method to return all answers as a list - instead of appended together - """ - self.answer = self.answer_cache - -class SASConvertAgent(ChunkedAgent): - """ - The language agent responsible for producing Python files from input SAS scripts. - """ - SYSTEM_PROMPT: str = "You are a coding expert in the statistical language, SAS, and Python and able to assist with converting between the two." - BASE_PROMPT: str = """ - I am converting an existing SAS script into Python. The file is quite long, so you may only see part of it at a time. - Please help me re-write this script using syntactically correct Python and any data science libraries (ex. numpy, pandas, polars) that might be needed. - If more than one script is needed, please place the file contents in separate ```python blocks. - Please provide ONLY the output code. - - Here's an outline of the script with some context that might be helpful: - {context} - - Here is my SAS file: - ```sas - {{question}} - ``` - """ - # Regex to extract python script from OpenAI response - # (allowing multiple cases because GPT-4 isn't consistent) - PYSCRIPT_CONTENT = re.compile(r"```[pP][ython]*\n(.+?)```", re.DOTALL) - - answer : list[str] = [] - - def __init__(self, question: str, context: str, model_name: str, llm: OpenAI | None = None, chunk_max: int = 3000): - self.context = context - self.BASE_PROMPT = self.BASE_PROMPT.format(context=self.context) - super().__init__(question, model_name, llm, chunk_max) - - def step(self): - # Prompt LLM for first-pass - llm_prompt_input = self.get_next_messages() - first_answer = self.prompt_agent(llm_prompt_input, n_tok = 2 * self.chunk_max).message.content - self.scratchpad += f"=== Input {self.curr_step} ==========\n" - self.scratchpad += "\n".join(msg["content"] for msg in llm_prompt_input) - self.scratchpad += "\n===================================\n" - - # Attempt to parse python scripts in response - ret_scripts = self.extract_pyscripts(first_answer) - self.scratchpad += f"\n=== Answer {self.curr_step} =====\n" - self.scratchpad += "\n".join(ret_scripts) - self.scratchpad += "\n===================================\n" - - self.answer_cache.extend(ret_scripts) - - # End run - self.terminated = len(self.question) == 0 - self.curr_step += 1 - - def combine_answer_cache(self) -> None: - """ - Overload this method to return all answers as a list - instead of appended together - """ - self.answer = self.answer_cache - - def extract_pyscripts(self, answer: str) -> list[str]: - py_scripts = [script.group(1) for script in self.PYSCRIPT_CONTENT.finditer(answer)] - logger.info(f"Extracted {len(py_scripts)} python chunk from response.") - - return(py_scripts) - -class PythonRefineAgent(ToolAwareAgent): - """ - A Language agent that attempts to refine input Python code using coding knowledge and local tool calls to mypy and black - """ - SYSTEM_PROMPT: str = "You are a Python coding expert and can identify and correct syntactical mistakes and fix incomplete code using standard conventions." - BASE_PROMPT: str = """ - I have converted part of an existing SAS script into Python. Due to length, the script may have been translated in chunks and the final results concatenated into a single script. - Please read this script and provide any corrections and re-organization as needed. Please also provide type hints, function docstrings, and guiding comments as needed. - You may call the mypy, black, and ruff tools to assist, and you may call both in parallel. - If no changes are needed, provide the script back using the call_submit tool. Always check the file first before submitting your final answer using call_submit. - Please provide ONLY the output code marked in a code block, no additional commentary is needed. - - Here is the python file: - ```python - {question} - ``` - """ - - TOOLS = [ - # Mypy - { - "type": "function", - "function": { - "name": "call_mypy", - "description": "Run the MyPy static typing tool on input code", - "parameters": { - "type": "object", - "properties": { - "code": { - "type": "string", - "description": "Python code to check using the mypy static typing tool" - } - }, - "required": ["code"] - } - } - }, - # Black - { - "type": "function", - "function": { - "name": "call_black", - "description": "Run the black python code formater on input python code", - "parameters": { - "type": "object", - "properties": { - "code": { - "type": "string", - "description": "Python code to check using black tool" - } - }, - "required": ["code"] - } - } - }, - # Ruff - { - "type": "function", - "function": { - "name": "call_ruff", - "description": "Run the ruff python code linter on input python code", - "parameters": { - "type": "object", - "properties": { - "code": { - "type": "string", - "description": "Python code to check using ruff tool" - } - }, - "required": ["code"] - } - } - } - ] - - @staticmethod - def call_black(code: str) -> str: - """ - Tool usable by language agent to return formatting help from Black - :param code: Python code to check against black - """ - try: - import black - except ImportError as e: - logger.error("black is mising, please install it with `pip install black`") - raise e - - return ToolAwareAgent._subprocess_tool_call_on_file(code, ["black"], output_type="file") - - @staticmethod - def call_ruff(code: str) -> str: - """ - Tool usable by language agent to return formatting help from Black - :param code: Python code to check against black - """ - try: - import ruff - except ImportError as e: - logger.error("ruff is mising, please install it with `pip install ruff`") - raise e - - return ToolAwareAgent._subprocess_tool_call_on_file(code, ["ruff", "check"], output_type="stdout") - - @staticmethod - def call_mypy(code: str) -> str: - """ - Tool usable by language agent to return formatting help from Black - :param code: Python code to check against black - """ - try: - import mypy - except ImportError as e: - logger.error("mypy is mising, please install it with `pip install mypy`") - raise e - - return ToolAwareAgent._subprocess_tool_call_on_file(code, ["mypy", "--install-types", "--non-interactive"]) diff --git a/agents/callbacks.py b/agents/callbacks.py new file mode 100644 index 0000000..f2ae264 --- /dev/null +++ b/agents/callbacks.py @@ -0,0 +1,56 @@ +""" +Callbacks for Agents +""" + +import abc + +from .agent import Agent + + +class _Callback(metaclass=abc.ABCMeta): + """ + A Callback virtual class + """ + @abc.abstractmethod + def __call__(self, cls: type[Agent], answer: str, scratchpad: str): + """ + Primary method called by agent during callback process + + :param Agent cls: Instantitated class of calling agent for possible modification + :param str answer: The final response of the calling Agent + :param str scratchpad: The full interaction history of the calling Agent + + """ + raise NotImplementedError() + + +class AgentCallback(_Callback): + """ + Call another agent with the answer and scratchpad of a completed agent + """ + def __init__(self, agent_class: type[Agent], **agent_kwargs): + """ + Create an Agent Callback, i.e. an Agent which will be called at the + end of an Agent run with the answer and scratchpad. + + The provided `agent_class` will be initialized at the end of the run with + `answer` and `scratchpad` variables passed to format the `BASE_PROMPT` + + Possible use cases could include reflection/reaction on llm agent feedback, + summarization of task, etc. + + :param Agent agent_class: Uninitialized Agent class to use in callback + :param agent_kwargs: All named arguments with which to initialize agent_class + """ + self.agent_class = agent_class + self.agent_kwargs = agent_kwargs + + async def __call__(self, cls: type[Agent], answer: str, scratchpad: str) -> None: + """ + Run new callback agent on calling agent's answer and scratchpad and append output. + """ + + self.callback_agent = self.agent_class(**self.agent_kwargs, answer=answer, scratchpad=scratchpad) + await self.callback_agent.run() + cls.callback_output.append(self.callback_agent.answer) + diff --git a/agents/decorators.py b/agents/decorators.py new file mode 100644 index 0000000..268feee --- /dev/null +++ b/agents/decorators.py @@ -0,0 +1,37 @@ +from functools import wraps, partial +from typing import Optional, Union +import pydantic +import logging + +logger = logging.getLogger(__name__) + +def response_model_handler(func: Optional[type[pydantic.BaseModel]], /, *, expected_len: Optional[int] = None): + """ + A decorator that wraps a Pydantic BaseModel and returns string output in the event of parsing errors or the correctly parsed model object otherwise. + + :param BaseModel func: The Pydantic BaseModel to wrap + :param int expected_len: The expected length of args in the parsed output (optional, currently HACK-y since it applies the same value to all args) + """ + if not func: + return partial(response_model_handler, expected_len=expected_len) + + @wraps(func) + def inner_wrapper(*args, **kwargs) -> Union[pydantic.BaseModel, str]: + + try: + parsed = func(*args, **kwargs) + except pydantic.ValidationError as err: + logger.warning(f"Response didn't pass pydantic validation") + return f"Pydantic validation failed:\n{str(err)}" + + if expected_len: + # Determine if length mismatch exists + vars_mismatch = [f"{var}: {len(arr)} != {expected_len}" for var, arr in parsed.model_dump().items() if len(arr) != expected_len] + + if len(vars_mismatch): + logger.warning(f"Got the following length mismatches in parsed args:\n{vars_mismatch}") + return "Length mismatch in return:\n{}".format("\n".join(vars_mismatch)) + + return parsed + + return inner_wrapper \ No newline at end of file diff --git a/agents/env.py b/agents/env.py deleted file mode 100644 index 153ea1f..0000000 --- a/agents/env.py +++ /dev/null @@ -1,146 +0,0 @@ -""" -A Wikipedia QA Gymnasium / Environment -(almost entirely lifted / based from Reflexion repo, all credit to Noah Shinn and team) -https://github.com/noahshinn/reflexion -""" - -import logging -import re -import string -from typing import Union - -import gymnasium as gym -from langchain_community.docstore.wikipedia import Wikipedia - -from .react import DocstoreExplorer -from .ssl_tools import no_ssl_verification - -logger = logging.getLogger(__name__) - - -class WikiQAEnv(gym.Env): - def __init__(self, question: str, truth: str, max_steps: int = 6): - self.explorer = DocstoreExplorer(Wikipedia()) - self.question = question - self.truth = truth - self.max_steps = max_steps - - self.reset() - - def reset(self): - self.curr_step = 1 - self.terminated = False - self.answer = "" - - def step(self, action: str) -> tuple[str, bool, bool, bool, int]: - """ - One step running the environment - """ - reward: bool = False - action_type, arg = self.parse_action(action) - - logger.debug(f"Step {self.curr_step}: got action, {action_type}; arg {arg}") - - if action_type == "Finish": - self.answer = arg - - if reward := self.is_correct(): - obs = "Answer is CORRECT" - else: - obs = "Answer is INCORRECT" - - logger.info(f"Final answer given: {arg}, {obs}") - self.terminated = True - - elif action_type == "Search": - try: - # HACK: CDC Uses self-signed certs in resolution - with no_ssl_verification(): - obs = self.explorer.search(arg).strip("\n").strip() - except Exception as e: - logger.debug(e) - obs = "Could not find that page, please try another search." - - elif action_type == "Lookup": - try: - obs = self.explorer.lookup(arg).strip("\n").strip() - except Exception as e: - logger.debug(e) - obs = "The last page Searched was not found, so you cannot Lookup a keyword in it. Please try one of the similar pages given." - else: - obs = "Invalid Action. Valid Actions are Lookup[] Search[] and Finish[]." - - terminated = self.terminated - truncated = self.is_truncated() - - self.curr_step += 1 - - logger.info(f"Observed: {obs}; truncated: {truncated}; finished: {terminated}") - return (obs, reward, terminated, truncated, self.curr_step) - - def is_truncated(self) -> bool: - return self.curr_step >= self.max_steps - - @staticmethod - def parse_action(string: str) -> Union[tuple[str, str], tuple[None, None]]: - """ - 'action[argument]' -> ('action', 'argument') - """ - pattern = r"^(\w+)\[(.+)\]$" - match = re.match(pattern, string) - - if match: - action_type = match.group(1) - argument = match.group(2) - return action_type, argument - - else: - return None, None - - def is_correct(self) -> bool: - return self.normalize_answer(self.truth) == self.normalize_answer(self.answer) - - @staticmethod - def normalize_answer(ans: str) -> str: - def remove_articles(text): - return re.sub(r"\b(a|an|the)\b", " ", text) - - def white_space_fix(text): - return " ".join(text.split()) - - def remove_punc(text): - exclude = set(string.punctuation) - return "".join(ch for ch in text if ch not in exclude) - - def lower(text): - return text.lower() - - return white_space_fix(remove_articles(remove_punc(lower(ans)))) - - -class WikiQAEnvActive(WikiQAEnv): - """ - WikiQA Gymnasium with a Human-in-the-loop to judge whether response is correct - when evaluating final anwser - """ - - def is_correct(self) -> bool: - res = input( - f"===Answer===\n{self.normalize_answer(self.answer)}\n===\nIs answer correct? ([T]rue, [F]alse)" - ).lower() - - while True: - if res in ["true", "t"]: - res_bool = True - break - elif res in ["false", "f"]: - res_bool = False - break - else: - res = ( - input("Invalid option, please return either [t]rue/[f]alse") - .lower() - .strip() - ) - - return res_bool diff --git a/agents/meta.py b/agents/meta.py deleted file mode 100644 index d70f0cc..0000000 --- a/agents/meta.py +++ /dev/null @@ -1,166 +0,0 @@ -""" -Meta-algorithms and arrangements for agent systems -""" - -from .agent import Agent, ChunkedAgent, PersistentAgent -import logging -import openai -import re - -logger = logging.getLogger(__name__) - -class Reflexion(Agent): - """ - Implement a Reflexion-like algorithm for a number of loops. - This is assumed to be un-supervised, rather than a reinforcement-based approach where the answer is known apriori. - - The process is roughly: actor(start) - - :param start (str): A prompt or starting state to feed to the actor agent - :param actor (Agent): A language agent which produces the output desired - :param evaluator (Agent): A language agent that reviews the output of the actor agent each round and provides feedback for improvement - :param model_name (str): An Azure OpenAI deployment name to use for the two agents - :param n_rounds (int): The number of rounds that the actor and evaluator loop should be run - """ - max_steps : int - actor_proto_class : PersistentAgent - eval_proto_class : PersistentAgent - actor : PersistentAgent = None - evaluator : PersistentAgent = None - agent_answers : list[str] = [] - eval_answers : list[str] = [] - logger : logging.Logger - - def __init__(self, start: str, actor: PersistentAgent, evaluator: PersistentAgent, model_name: str, n_rounds: int = 3, llm: openai.OpenAI | None = None) -> None: - super().__init__(question=start, model_name=model_name, llm=llm) - self.max_steps = n_rounds - self.actor_proto_class = actor - self.eval_proto_class = evaluator - self.actor = self.actor_proto_class(question=self.format_prompt(), model_name=self.model_name, llm = self.llm) - self.logger = logger.getChild(self.__class__.__name__) - - def run(self, reset: bool = False) -> None: - if not self.truncated or self.terminated: - # Run action on start so each step/round starts with a reflection and ends with an action - self.logger.info("=== Starting Place ========") - self.scratchpad += "=== Starting Place ==========\n" - self.answer = self.actor() - self.agent_answers.append(self.answer) - self.scratchpad += self.actor.scratchpad - self.scratchpad += "\n=====================\n" - - return super().run(reset) - def step(self): - """ - Run 1 round of reflection + action - """ - self.logger.info(f"=== Round {self.curr_step} ===") - self.scratchpad += f"=== Round {self.curr_step} ===\n" - # === Reflect ================= - self.logger.info("Reflecting.") - self.scratchpad += f"=== Reflecting =========\n" - if self.evaluator is None: - # Init if we haven't yet - self.evaluator = self.eval_proto_class(question=self.answer, model_name=self.model_name, llm = self.llm) - else: - self.evaluator.add_observation(self.answer) - - self.evaluator.step() - reflection = self.evaluator.answer - - self.scratchpad += reflection - self.scratchpad += "\n=====================\n" - self.eval_answers.append(reflection) - - # === Act ===================== - self.logger.info("Acting.") - self.scratchpad += f"=== Acting =========\n" - self.actor.add_observation(reflection) - self.actor.step() - self.answer = self.actor.answer - self.scratchpad += self.answer - self.scratchpad += "\n=====================\n" - self.agent_answers.append(self.answer) - - self.truncated = self.actor.truncated or self.evaluator.truncated - self.terminated = self.curr_step == self.max_steps - self.curr_step += 1 - - def format_prompt(self) -> str: - """ - No formatting needed here, just return the original input. - """ - return self.question - - def reset(self) -> None: - self.agent_answers = [] - self.eval_answers = [] - super().reset() - -class ChunkedReflexion(ChunkedAgent): - max_steps : int - actor_proto_class : PersistentAgent - eval_proto_class : PersistentAgent - logger : logging.Logger - def __init__(self, start: str, actor: PersistentAgent, evaluator: PersistentAgent, model_name: str, n_rounds: int = 2, llm: openai.OpenAI | None = None, chunk_max: int = 3000): - super().__init__(question=start, model_name=model_name, llm=llm, chunk_max=chunk_max) - - self.actor_proto_class = actor - self.eval_proto_class = evaluator - self.max_steps = n_rounds - self.BASE_PROMPT = self.actor_proto_class.BASE_PROMPT - self.SYSTEM_PROMPT = self.actor_proto_class.SYSTEM_PROMPT - - self.logger = logger.getChild(self.__class__.__name__) - - def step(self): - """ - One step of the alg. Processes on one chunk at a time, running a Reflexion-style algorithm over each chunk. - """ - self.scratchpad += f"=== Chunk {self.curr_step} ========\n" - self.logger.info(f"=== Chunk {self.curr_step} =============") - step_input = self.format_prompt() - reflection_agent = Reflexion(step_input, self.actor_proto_class, self.eval_proto_class, self.model_name, n_rounds=self.max_steps, llm=self.llm) - - # Add previous chunk's translation as context - if (last_response := self.fetch_last_response()) is not None: - reflection_agent.actor.conversation_cache.append({ - "role": "user", - "content": "Here is the translation of the previous code chunk, for context:\n```python\n{}\n```".format(last_response["content"]) - }) - - step_answer = reflection_agent() - - self.scratchpad += reflection_agent.scratchpad - self.scratchpad += "\n=======================\n" - self.answer_cache.append(step_answer) - - self.terminated = len(self.question) == 0 - self.curr_step += 1 - - def format_prompt(self, split_expr: str = "\n{2,}?", join_str: str = "\n\n", **kwargs) -> str: - """ - Formatting BASE_QUERY, checking for output length and chunking self.question if necessary - - :param split_expr (str): A string or regex to pass to re.split() to split self.question into chunks. - :param join_str (str): A string to use to recompose chunks of self.question back together. - - NOTE: split_expr and join_str can be different (ex. '\\n{2, }?', and '\\n\\n'), - but join_str should always produce output that could be split on subsequent calls using split_expr. - """ - prompt_len = self.get_prompt_len() - - input_chunks = re.split(split_expr, self.question) - excess = [] - - # pop chunk by chunk until we have a message payload less than the requested max - while len(self.tokenizer.encode(join_str.join(input_chunks))) + prompt_len > self.chunk_max: - # Store excess message payload in question object - excess.append(input_chunks.pop()) - - # Reverse things around and re-join to string - # to get things the right way around - self.question = join_str.join(reversed(excess)) - - return join_str.join(input_chunks) - \ No newline at end of file diff --git a/agents/processors.py b/agents/processors.py new file mode 100644 index 0000000..6978a2c --- /dev/null +++ b/agents/processors.py @@ -0,0 +1,246 @@ +import asyncio +import logging +from abc import ABCMeta, abstractmethod +from itertools import islice +from typing import Iterable, Iterator, Tuple, Any, Sequence + +import openai +import polars as pl +from tqdm import tqdm + +from .agent import Agent +from .abstract import _Agent +from .generic import openai_creds_ad + +logger = logging.getLogger(__name__) + + +class _BatchProcessor(metaclass=ABCMeta): + """ + A virtual class for a processor that maps over a large iterable + where 1 output is expected for every 1 input + """ + def __init__(self, data: Iterable, agent_class: type[Agent], batch_size: int = 5, n_workers : int = 1, n_retry : int = 5, interactive : bool = False, **kwargs): + """ + A Processor which operates on chunks of an Iterable. + Each chunk must be independent, as state will not be maintained between agent calls. + + :param Iterable data: An object which will be split into chunks of `batch_size` and passed as-is to `agent_class` + :param Agent agent_class: An uninitialized class which will be used to process the batches of `data` (which it takes as a named argument, `batch`, for formatting) + :param int batch_size: Number of rows per batch of `data` + :param int n_workers: Number of workers to use during processing. >1 will spawn parallel workers using concurrent.futures + :param int n_retry: For each agent, how many round trips to try before giving up + :param bool interactive: Use interactive authentication (`True`) instead of ClientSecret authentication (not advised, for debugging) + :param kwargs: Additional named arguments passed to `agent_class` on init + """ + self.agent_class = agent_class + self.data = data + self.n_workers = n_workers + self.batch_size = batch_size + self.n_retry = n_retry + self.agent_kwargs = kwargs + self.interactive = interactive + + openai_creds_ad("Interactive" if self.interactive else "ClientSecret") + self.llm = openai.AsyncAzureOpenAI() + + # Parallel queues + # idx, retries remaining, batch + self.in_q : asyncio.PriorityQueue[Tuple[int, int, Any]] = asyncio.PriorityQueue() + # idx, response + self.out_q : asyncio.PriorityQueue[Tuple[int, Any]] = asyncio.PriorityQueue() + + def _load_inqueue(self): + """ + Load inqueue for parallel processing + """ + logger.info("Loading Inqueue") + for i, batch in enumerate(self._iter()): + self.in_q.put_nowait((i, self.n_retry, batch)) + + self.error_tasks = 0 + + @staticmethod + def dequeue(q: asyncio.Queue) -> list: + out = [] + while q.qsize() > 0: + try: + it = q.get_nowait() + out.append(it) + except asyncio.QueueEmpty: + break + return out + + @abstractmethod + def _iter(self) -> Iterator: + """ + Abstract method that should return self.data chunked by self.batch_size + """ + raise NotImplementedError() + + @abstractmethod + def _placeholder(self, batch: Any) -> Any: + """ + Abstract method which should return an appropriately sized placholder data piece + that will be inserted in place of a real prediction if we encounter an error + """ + raise NotImplementedError() + + async def process(self): + """ + Process all samples from input data using language agent, splitting by chunk size specified at init + + :return Iterable: The predicted values after mapping over the input iterable (also stored in self.predicted) + """ + # Either the workers we called for at init or the number of batches we have to process + # (whichever is fewer) + self._load_inqueue() + n_workers = min(self.n_workers, self.in_q.qsize()) + logger.info(f"[_process_parallel] processing {self.in_q.qsize()} queries on {n_workers} threads") + + self.pbar = tqdm(total=self.in_q.qsize(), desc="Batch Processing", unit="Batch") + workers = [] + + for i in range(n_workers): + workers.append(asyncio.create_task(self._worker(f"llm_worker-{i}", self.in_q, self.out_q), name=f"llm_worker-{i}")) + + # Wait until the queue is processed + await self.in_q.join() + + # Terminate workers + for worker in workers: + worker.cancel() + + await asyncio.gather(*workers, return_exceptions=True) + + self.pbar.close() + + if self.error_tasks > 0: + logger.warning(f"[process] There were {self.error_tasks} unsucessful batches!") + + # De-queue into list from output queue + out = [] + for _, msg in self.dequeue(self.out_q): + if isinstance(msg, list): + out.extend(msg) + else: + out.append(msg) + + return out + + async def _worker(self, worker_name: str, in_q: asyncio.PriorityQueue, out_q: asyncio.PriorityQueue): + """ + Agent worker + """ + while True: + try: + (id, retry_left, data) = await in_q.get() + errored = False + agent = self._spawn_agent(data) + answer = await agent() + + if len(answer) == 0: + logger.error(f"[_worker - {worker_name}]: No answer was provided for query {id}") + errored = True + + except asyncio.CancelledError as e: + logger.info(f"[_worker - {worker_name}]: Got CancelledError, terminating.") + break + + except Exception as e: + logger.error(f"[_worker - {worker_name}]: Task {id} failed, {str(e)}") + errored = True + + if errored: + retry_left -= 1 + if retry_left < 0: + # End retries, fill in data with placeholder + logger.error(f"[_worker - {worker_name}]: Task {id} failed {self.n_retry} times and will not be retried") + answer = self._placeholder(data) + self.error_tasks += 1 + else: + # Send data back to queue to retry processing + logger.info(f"[_worker - {worker_name}]: Task {id} - {retry_left} retries remaining") + await in_q.put((id, retry_left, data)) + in_q.task_done() + continue + + await out_q.put((id, answer)) + self.pbar.update() + in_q.task_done() + + @staticmethod + def _batch_format(batch: Any) -> str: + """ + An optional formatter to convert batch into a str + """ + return str(batch) + + def _spawn_agent(self, batch: Iterable) -> Agent: + batch_str = self._batch_format(batch) + out = self.agent_class(llm=self.llm, **self.agent_kwargs, batch=batch_str) # type: ignore[misc] + return out + +class BatchProcessor(_BatchProcessor): + """ + A batch processor which maps an Agent over elements of an iterable (usually a list[str]). + + Each chunk of the iterable is operated on independently. + """ + def _iter(self) -> Iterator: + """ + Just a backport of itertools.batched + + Yields a batch of `self.data` of len `self.batch_size` until data is exhausted. + """ + iterator = iter(self.data) + while batch := tuple(islice(iterator, self.batch_size)): + yield batch + + def _placeholder(self, batch: Sequence): + """ + Returns a `List[str]` with `len() == len(batch)` + """ + resp_obj = "" if self.agent_class.output_len == 1 else ("", ) * self.agent_class.output_len + return [resp_obj] * len(batch) + +class DFBatchProcessor(_BatchProcessor): + """ + A Processor which operates on chunks of a polars dataframe. + Each chunk must be independent, as state will not be maintained between agent calls. + + The main user-facing method after init is :func:`process()` + """ + data : pl.DataFrame + + def __init__(self, data: pl.DataFrame, agent_class: type[Agent], batch_size: int = 5, n_workers : int = 1, n_retry : int = 5, interactive : bool = False, **kwargs): + """ + A Processor which operates on chunks of a polars dataframe. + Each chunk must be independent, as state will not be maintained between agent calls. + + :param pl.DataFrame data: A Data Frame object which will be split by `batch_size` rows and passed as-is to `agent_class` + :param Agent agent_class: An uninitialized class which will be used to process the chunks of `data` + :param int batch_size: Number of rows per batch of `data` + :param int n_workers: Number of workers to use during processing. >1 will spawn parallel workers using concurrent.futures + :param int n_retry: For each agent, how many round trips to try before giving up + :param bool interactive: Use interactive authentication (`True`) instead of ClientSecret authentication (not advised, for debugging) + :param kwargs: Additional named arguments passed to `agent_class` on init + """ + super().__init__(data, agent_class, batch_size, n_workers, n_retry, interactive, **kwargs) + + def _iter(self) -> Iterator[pl.DataFrame]: + return self.data.iter_slices(self.batch_size) + + def _placeholder(self, batch: pl.DataFrame): + """ + Returns a List[str] with len() == batch.height + """ + resp_obj = "" if self.agent_class.output_len == 1 else ("", ) * self.agent_class.output_len + return [resp_obj] * batch.height + + @staticmethod + def _batch_format(batch: pl.DataFrame) -> str: + """ + Write out batch argument as ndJSON formatted str to insert into BASE_PROMPT + """ + return batch.write_ndjson() \ No newline at end of file diff --git a/agents/react.py b/agents/react.py deleted file mode 100644 index aa91bdd..0000000 --- a/agents/react.py +++ /dev/null @@ -1,58 +0,0 @@ -""" -React DocExplorer -(From langchain, included here to avoid issues of deprecation) -https://raw.githubusercontent.com/langchain-ai/langchain/56e5aa4dd9fbd5efcf7836e7323a42867cdad3fc/libs/langchain/langchain/agents/react/base.py -""" -from typing import Optional - -from langchain_community.docstore.base import Docstore -from langchain_core.documents import Document - - -class DocstoreExplorer: - """Class to assist with exploration of a document store.""" - - def __init__(self, docstore: Docstore): - """Initialize with a docstore, and set initial document to None.""" - self.docstore = docstore - self.document: Optional[Document] = None - self.lookup_str = "" - self.lookup_index = 0 - - def search(self, term: str) -> str: - """Search for a term in the docstore, and if found save.""" - result = self.docstore.search(term) - if isinstance(result, Document): - self.document = result - return self._summary - else: - self.document = None - return result - - def lookup(self, term: str) -> str: - """Lookup a term in document (if saved).""" - if self.document is None: - raise ValueError("Cannot lookup without a successful search first") - if term.lower() != self.lookup_str: - self.lookup_str = term.lower() - self.lookup_index = 0 - else: - self.lookup_index += 1 - lookups = [p for p in self._paragraphs if self.lookup_str in p.lower()] - if len(lookups) == 0: - return "No Results" - elif self.lookup_index >= len(lookups): - return "No More Results" - else: - result_prefix = f"(Result {self.lookup_index + 1}/{len(lookups)})" - return f"{result_prefix} {lookups[self.lookup_index]}" - - @property - def _summary(self) -> str: - return self._paragraphs[0] - - @property - def _paragraphs(self) -> list[str]: - if self.document is None: - raise ValueError("Cannot get paragraphs without a document") - return self.document.page_content.split("\n\n") \ No newline at end of file diff --git a/agents/stopping_conditions.py b/agents/stopping_conditions.py new file mode 100644 index 0000000..dfffaf7 --- /dev/null +++ b/agents/stopping_conditions.py @@ -0,0 +1,51 @@ +""" +Stopping conditions for language agents +""" + +from .abstract import _StoppingCondition +from pydantic import BaseModel +from typing import Optional + +class StopOnStep(_StoppingCondition): + """ + Stop language agent on a given step + + :param int step: Number of steps the Agent should run before terminating + """ + + def __init__(self, step: int = 1): + self.step = step + + def __call__(self, cls, response) -> Optional[str]: + if cls.curr_step >= self.step: + return response.message.content + else: + return None + +class StopOnDataModel(_StoppingCondition): + """ + A stopping condition that checks whether the final tool call was an instance of the provided class. + + Useful for terminating once we receive correctly parsed / structured output + """ + def __init__(self, answer_cls: type[BaseModel]): + self.answer_cls = answer_cls + + def __call__(self, cls, response) -> Optional[dict]: + if len(cls.tool_res_payload) and isinstance(cls.tool_res_payload[-1]["content"], self.answer_cls): + return cls.tool_res_payload[-1]["content"].model_dump() # pydantic.BaseModel.model_dump() -> dict[str, Any] + else: + return None + +class StopNoOp(_StoppingCondition): + """ + A stopping condition which always returns None. + + This is used when answer / stop handling is done internal to the Agent + """ + + def __init__(self): + pass + + def __call__(self, cls, response) -> None: + return None diff --git a/agents/tools.py b/agents/tools.py new file mode 100644 index 0000000..4717a84 --- /dev/null +++ b/agents/tools.py @@ -0,0 +1,39 @@ +import subprocess +import sys +import tempfile +from typing import Literal + + +def _subprocess_tool_call_on_file(tool_input: str, cmd_args: list[str], output_type: Literal["stdout", "file"] = "stdout") -> str: + """ + A helper function that writes `tool_input` to a file and runs a python module on that file, either returning stdout+stderr or the contents of the file after the subprocess call. + + Ex. As a tool call for an Agent to use mypy / black on device and return output + + :param tool_input (str): A string to pass as input to the tool (this is likely code) + :param cmd_args (list[str]): Command-line args between the python -m call and the file name (should include the python module to call and any additional arguments) + :param output_type (str): The output to return (either stdout+error, or contents of the tempfile, if this is modified) + + :return: Either stdout and stderr concatenated into a string and separated by a newline, or `tool_input` after calling the python module + """ + with tempfile.TemporaryFile("w", delete=False) as file: + file.write(tool_input) + file.close() + + # Run mypy in a subprocess and capture stderr and stdout + subprocess_output = subprocess.run( + [sys.executable, "-m", *cmd_args, file.name], + capture_output=True, + text=True + ) + + if output_type == "stdout": + return "\n".join([subprocess_output.stdout, subprocess_output.stderr]) + elif output_type == "file": + with open(file.name, "r", encoding="utf-8") as f: + out = f.read() + + return(out) + else: + # Shouldn't be reachable + raise NotImplementedError() diff --git a/examples/agent_with_callback.py b/examples/agent_with_callback.py new file mode 100644 index 0000000..728b5bf --- /dev/null +++ b/examples/agent_with_callback.py @@ -0,0 +1,65 @@ +""" +Demo of simple Python code production Agent with an Evaluator Agent as a callback +Sean Browning +""" +import asyncio +import agents +import logging +import openai +import dotenv + +# NOTE: This loads in env vars for openAI +dotenv.load_dotenv() + +# Uncomment to see under the hood a bit more +# logger = logging.basicConfig(level=logging.INFO) + +class DummyAgent(agents.Agent): + """ + A code producing agent with a simple prompt + """ + SYSTEM_PROMPT = "You are a language agent proficient in producing expressive and syntactically correct Python code." + BASE_PROMPT = "Write a simple program to fit an ordinary least squares regression model to a polars DataFrame input. Include a worked example in your response and provide only the code in code fences (ex. ```python)" + +class DummyEvaluatorAgent(agents.Agent): + """ + An evaluator agent that will be called with the output of the above agent and give feedback on the response and the prompt + """ + SYSTEM_PROMPT = "You are a skilled project manager and evaluator of Python programs. You provide expert evaluation of whether code meets stated goals and reflect on how to improve the approach." + BASE_PROMPT = """ + The following Python program was produced by an AI language agent: + + {answer} + + Here is the full history of the requested program and the response from the AI languge agent in response: + + {scratchpad} + + Reflect on the performance of the AI language model and provide feedback on how the initial prompt and resulting python code could be improved. + """ + +if __name__ == "__main__": + + # Run this with Interactive OAuth + agents.openai_creds_ad("Interactive") + + llm = openai.AsyncAzureOpenAI() + + ag = DummyAgent( + model_name = "edav-api-share-gpt4-api-nofilter", + stopping_condition=agents.StopOnStep(1), + llm=llm, + callbacks=[ + agents.AgentCallback( + DummyEvaluatorAgent, + model_name="edav-api-share-gpt4-api-nofilter", + llm=llm, + stopping_condition=agents.StopOnStep(1) + ) + ], + oai_kwargs={"temperature": 1.0} + ) + + asyncio.run(ag()) + + print(f"Answer:\n{ag.answer}\n\nFeedback:\n{ag.callback_output[0]}") \ No newline at end of file diff --git a/examples/batch_processing.py b/examples/batch_processing.py new file mode 100644 index 0000000..6308170 --- /dev/null +++ b/examples/batch_processing.py @@ -0,0 +1,114 @@ +""" +Demo of batch processing + +An extension of the prior structured prediction example where we produce 20 Q this time +and send to agents to solve in batches of 5 + +Sean Browning +""" + +import asyncio +import agents +from pydantic import BaseModel, Field +from typing import List, Literal +import logging +import dotenv +import openai + +N_QUESTIONS = 20 +BATCH_SIZE = 5 + +# NOTE: This loads in env vars for openAI +dotenv.load_dotenv() + +logger = logging.getLogger(__name__) + +logging.basicConfig(level=logging.INFO) + +class Answer(BaseModel): + """ + Response we expect from question answering agent + """ + answer: List[Literal["A", "B", "C", "D"]] = Field(description="Answer to the question") + +class QuestionandAnswer(Answer): + """ + The response body we expect question producing agent (Q + A) + """ + question: List[str] = Field(description="Question text that includes the question and response options, but NOT the answer") + +class QAGenerator(agents.StructuredOutputAgent): + """ + A language agent which produces questions to be answered + """ + + SYSTEM_PROMPT = """ + You are a language agent proficient in crafting multiple choice questions. + You are competing against another lanaguage agent, so you only produce difficult questions that you think would stump another AI model. + """ + + BASE_PROMPT = """ + Write {n_questions} short multiple choice questions on whatever topic you choose and supply their correct answers. + + For each question, there should be 4 possible answer choices: A, B, C, and D. + + You will supply the question text and the answer as parameters to a function call. + The function call accepts an array for each argument, so pass all questions and answers in the same function call and ensure their indices align. + + For this task, perform two sequential actions: + 1. Think through step-by-step how you would craft a question to stump an AI model of the same level as yourself + 2. Send your questions and answers via function call + + Send each action as a separate response + """ + +class QAnswerer(agents.StructuredOutputAgent): + SYSTEM_PROMPT = "You are a language agent proficient in answering multiple choice questions." + # NOTE: {batch} is important here, since it's passed implicitly by the processor + BASE_PROMPT = """ + Answer each multiple choice question using a function call. + The function call accepts an array for each argument, so pass all answers in the same function call and ensure their indices align. + + {batch} + """ + +if __name__ == "__main__": + # Run this with Interactive OAuth + agents.openai_creds_ad("Interactive") + + llm = openai.AsyncAzureOpenAI() + + ag = QAGenerator( + response_model=QuestionandAnswer, + model_name = "edav-api-share-gpt4-api-nofilter", + llm = llm, + oai_kwargs={"temperature": 1.0}, + n_questions=N_QUESTIONS + ) + + # Generate our questions + asyncio.run(ag()) + + print(ag.answer) + + proc = agents.BatchProcessor( + data=ag.answer["question"], + agent_class=QAnswerer, + batch_size=BATCH_SIZE, + n_workers=3, + response_model=Answer, + model_name = "edav-api-share-gpt4-api-nofilter", + oai_kwargs={"temperature": 0.0} + ) + + predicted_answers_raw = asyncio.run(proc.process()) + + predicted_answers = [] + + for batch in predicted_answers_raw: + predicted_answers.extend(batch["answer"]) + + print(predicted_answers) + + n_correct = sum(expected == predicted for expected, predicted in zip(ag.answer["answer"], predicted_answers)) + print(f"{n_correct} / {len(ag.answer['answer'])} Correct") \ No newline at end of file diff --git a/examples/main.py b/examples/main.py deleted file mode 100644 index b64b781..0000000 --- a/examples/main.py +++ /dev/null @@ -1,134 +0,0 @@ -import argparse -import os -import logging - -import openai -from azure.identity import DeviceCodeCredential -from dotenv import load_dotenv - -import agents - -load_dotenv() - -# === Service Principal auth ======================== -# === Get creds, set env vars ================================= -credential = DeviceCodeCredential() -os.environ["AZURE_OPENAI_API_KEY"] = credential.get_token( - "https://cognitiveservices.azure.com/.default" -).token -os.environ["AZURE_OPENAI_ENDPOINT"] = os.getenv("GPT4_URL") - - -def main(): - parser = argparse.ArgumentParser() - parser.add_argument("--question", "-q", required=True, type=str) - parser.add_argument("--truth", "-t", required=True, type=str) - parser.add_argument("--examples", "-e", type=str) - parser.add_argument("--n_trials", "-n", type=int, default=5) - parser.add_argument("--n_steps", type=int, default=5) - parser.add_argument( - "--model", "-m", type=str, default="edav-api-share-gpt4-api-nofilter" - ) - parser.add_argument( - "--agent", "-a", choices=["react", "reflexion"], type=str, default="react" - ) - parser.add_argument( - "--reflection_strategy", - "-s", - choices=["last_attempt", "reflexion", "last_attempt_reflexion"], - default="last_attempt_reflexion", - ) - parser.add_argument("--debug", action="store_true", default=False) - parser.add_argument("--log_path", default="out.log", type=str) - args = parser.parse_args() - - # Add hard-coded example option if none are passed - if args.examples is None: - examples = """Question: Which of Jonny Craig and Pete Doherty has been a member of more bands ? -Thought 1: I need to search Jonny Craig and Pete Doherty, find the number of bands they have been a member of, then find which one has been a member of more bands. -Action 1: Search[Jonny Craig] -Observation 1: Jonathan Monroe Craig (born March 26, 1986) is a Canadian-American singer and songwriter, best known as the former lead vocalist of bands Dance Gavin Dance, Emarosa, Rain City Drive (previously known as Slaves), as well as the co-lead vocalist of the short-lived supergroup Isles & Glaciers. As a solo artist, he has released two studio albums, two EPs and a live album to date, and his distinct type of soul-based singing has earned him considerable acclaim. -Thought 2: Jonny Craig has been a member of four bands. I need to search Pete Doherty next and find the number of bands he has been a member of. -Action 2: Search[Pete Doherty] -Observation 2: Peter Doherty (born 12 March 1979) is an English musician, songwriter, actor, poet, writer, and artist. He is best known for being co-frontman of The Libertines, which he formed with Carl Barât in 1997. His other musical projects are indie band Babyshambles and Peter Doherty and the Puta Madres. -Thought 3: Pete Doherty has been a member of three bands. So Jonny Craig has been a member of more bands. -Action 3: Finish[Jonny Craig] -Observation 3: Answer is CORRECT -Correct answer: Jonny" Craig - -Question: The creator of "Wallace and Gromit" also created what animation comedy that matched animated zoo animals with a soundtrack of people talking about their homes? -Thought 1: I need to search "Wallace and Gromit" and find the animation comedy that matched animated zoo animals with a soundtrack of people talking about their homes. -Action 1: Search["Wallace and Gromit"] -Observation 1: Could not find ["Wallace and Gromit"]. Similar: ['Wallace and Gromit', 'Wallace & Gromit: The Curse of the Were-Rabbit', 'A Close Shave', 'A Grand Day Out', 'The Wrong Trousers', 'A Matter of Loaf and Death', 'Nick Park', 'Aardman Animations', 'Timmy Time', "Wallace and Gromit's Cracking Contraptions"] -Thought 2: To find the animation comedy, I can search Nick Park. -Action 2: Search[Nick Park] -Observation 2: Nicholas Wulstan Park (born 6 December 1958) is a British animator who created Wallace and Gromit, Creature Comforts, Chicken Run, Shaun the Sheep, and Early Man. Park has been nominated for an Academy Award a total of six times and won four with Creature Comforts (1989), The Wrong Trousers (1993), A Close Shave (1995) and Wallace & Gromit: The Curse of the Were-Rabbit (2005).He has also received five BAFTA Awards, including the BAFTA for Best Short Animation for A Matter of Loaf and Death, which was also the most watched television programme in the United Kingdom in 2008. His 2000 film Chicken Run is the highest-grossing stop motion animated film.For his work in animation, in 2012, Park was among the British cultural icons selected by artist Peter Blake to appear in a new version of Blake's most famous artwork—the Beatles' Sgt. Pepper's Lonely Hearts Club Band album cover—to celebrate the British cultural figures of his life. -Thought 3: Nick Park created Wallace and Gromit, Creature Comforts, Chicken Run, Shaun the Sheep, and Early Man. The animation comedy that matched animated zoo animals with a soundtrack of people talking about their homes is Creature Comforts. -Action 3: Finish[Creature Comforts] -Observation 3: Answer is CORRECT -Correct answer: Creature Comforts""" - else: - examples = args.examples - if args.debug: - # Add debug logging - logger = logging.getLogger() - logger.setLevel(logging.DEBUG) - handle = logging.FileHandler(args.log_path) - handle.setLevel(logging.DEBUG) - logger.addHandler(handle) - - # Initialize environment / gymnasium - envir = agents.WikiQAEnv( - question=args.question, truth=args.truth, max_steps=args.n_steps - ) - - # Initialize LLM - llm = openai.AzureOpenAI() - - # Initialize Agent - if args.agent == "react": - agent = agents.ReactAgent( - question=args.question, - examples=examples, - model_name=args.model, - llm=llm, - env=envir, - ) - elif args.agent == "reflexion": - agent = agents.ReactandReflectAgent( - question=args.question, - examples=examples, - reflection_strategy=args.reflection_strategy, - model_name=args.model, - llm=llm, - env=envir, - ) - else: - raise NotImplementedError(f"Unknown agent class, {args.agent}") - - trial_results = [] - - for i in range(args.n_trials): - # If we got it right, stop running - if agent.correct: - break - # else, keep working - if args.agent != "reflexion": - # Only Reflexion stores - # memory between trials - agent.reset() - agent.run() - - if args.debug: - # Dump full scratchpad - logger.debug("Trial complete. Full Scratchpad:\n" + agent.scratchpad) - else: - print("Trial complete. Full Scratchpad:\n" + agent.scratchpad) - - trial_results.append(agent.correct) - - print(f"Successful trials: {sum(trial_results)} / {len(trial_results)}") - - -if __name__ == "__main__": - main() diff --git a/examples/prompt_refine.py b/examples/prompt_refine.py deleted file mode 100644 index a409fcf..0000000 --- a/examples/prompt_refine.py +++ /dev/null @@ -1,193 +0,0 @@ -import argparse -import os -import re -import math -import statistics -from io import StringIO -from pathlib import Path -from typing import Callable, Iterable - -import polars as pl -import textgrad as tg -from dotenv import load_dotenv -from textgrad.autograd import Module, StringBasedFunction -from textgrad.engine.openai import AzureChatOpenAI - -import agents - -load_dotenv() - -MODEL = "edav-api-share-gpt4-api-nofilter" -STARTING_SYSTEM_PROMPT = """ - You are a helpful language agent that takes malformed CSV input and returns the corrected input. - Rows may have extra empty columns added or have un-quoted content that leads to a parse error. - Try to remedy these errors and return the input to be parsed via polars.read_csv() -""".strip() -N_EPOCHS = 3 -BATCH_SIZE = 1 -# === Get creds, set env vars ================================= -agents.openai_creds_ad() -os.environ["AZURE_OPENAI_ENDPOINT"] = os.getenv("GPT4_URL") - - -class CSVLoss(Module): - """ - A Hacky CSV comparison loss FN - """ - - def __init__(self) -> None: - super().__init__() - self.call_fn = StringBasedFunction( - fn=self._eval_df, function_purpose="Evaluate CSV response to ground truth" - ) - - @staticmethod - def extract_csv(input_str: str) -> str: - """ - Extract just the CSV content from a body of text, assuming it's properly denoted within a ```csv block - """ - csv_re = re.compile(r"```[cC]sv\n(.+?)```", re.DOTALL) - return csv_re.findall(input_str)[0] - - @classmethod - def _eval_df(cls, input_val: tg.Variable, truth: tg.Variable) -> str: - """ - Evaluate if model response x matches the expected label y - """ - try: - x_df = pl.read_csv(StringIO(cls.extract_csv(input_val.get_value()))) - except pl.exceptions.ComputeError as e: - if "found more fields than defined" in str(e): - out = f"Parsing input CSV via `polars.read_csv()` failed because there were >=1 rows with more fields than columns in the CSV header" - return out - except Exception as e: - # Early return if this still fails to parse - out = f"Parsing input CSV via `polars.read_csv()` failed with following exception: {str(e)}" - return out - - y_df = pl.read_csv(StringIO(cls.extract_csv(truth.get_value()))) - - if x_df.equals(y_df): - out = "Answer Correct." - else: - out = "Answer parses correctly via `polars.read_csv()`, but doesn't match expected value" - - return out - - def forward(self, **kwargs): - return self.call_fn(inputs=kwargs) - - -def run_epoch(x: tg.Variable, y: tg.Variable, model: tg.BlackboxLLM, eval_fn: Callable): - """Main Epoch logic""" - answer = model(x) - loss = eval_fn(answer, y) - loss.backward() - -class CSVloads: - def __init__(self, root_path: os.PathLike, split: float = 0.5) -> None: - pattern = re.compile(f"(\d)+.csv") - # pattern is /d.csv , /d_correct.csv - # so find highest /d.csv file - self.root_path = Path(root_path) - self.n_examples = int(pattern.findall([file_path.name for file_path in self.root_path.iterdir() if pattern.match(file_path.name)][-1])[0]) - self.examples = [] - for i in range(self.n_examples): - with open(self.root_path / f"{i + 1}.csv", "r", encoding="utf-8") as file: - incorrect_file = tg.Variable( - f"```csv\n{file.read()}```", - requires_grad=False, - role_description="Input CSV with syntax errors", - ) - with open(self.root_path / f"{i + 1}_correct.csv", "r", encoding="utf-8") as file: - correct_file = tg.Variable( - f"```csv\n{file.read()}```", - requires_grad=False, - role_description="Correct answer", - ) - self.examples.append((incorrect_file, correct_file)) - - self.train_len = math.floor(split * self.n_examples) - - def train_iter(self): - for example in self.examples[:self.train_len]: - yield example - - def validation_iter(self): - for example in self.examples[self.train_len:]: - yield example - -class Validation: - results : dict[str, any] - def __init__(self, batch_size: int = 2) -> None: - self.results = {key: [] for key in ["prompt", "train_acc", "val_acc"]} - self.batch_size = batch_size - - def _eval_sample(self, sample, model: Module, loss: Module): - answer = model(sample[0]) - loss_val = loss(input_val=answer, truth=sample[1]) - - if loss_val.value == "Answer Correct.": - return 1 - else: - return 0 - def evaluate(self, samples: Iterable, prompt: tg.variable, model: Module, loss: Module): - val_results = [] - self.results["prompt"] = prompt.value - for i, sample in enumerate(samples): - val_results.append(self._eval_sample(sample, model, loss)) - - self.results["val_acc"].append(statistics.mean(val_results)) - - if len(self.results["val_acc"]) > 1 and (self.results["val_acc"][-1] < self.results["val_acc"][-2]): - # Revert prompt if accuracy doesn't improve - prompt.set_value(self.results["prompt"][-2]) - - -def main(): - data_path = Path("data", "csv_corrections") - - csv_data = CSVloads(data_path) - - # === Set up LLM Agents ================= - eng = AzureChatOpenAI(MODEL) - eng_backprop = AzureChatOpenAI(MODEL) - tg.set_backward_engine(eng_backprop, override=True) - system_prompt = tg.Variable( - STARTING_SYSTEM_PROMPT, - requires_grad=True, - role_description="System prompt for a somewhat capable language model that specifies behaviors and strategies for CSV structure correction", - ) - optimizer = tg.TextualGradientDescent( - engine=eng_backprop, parameters=[system_prompt] - ) - loss_fn = CSVLoss() - - model = tg.BlackboxLLM(eng, system_prompt=system_prompt) - validator = Validation(BATCH_SIZE) - # Run for N epochs (+1 for starting place) - for i in range(N_EPOCHS + 1): - print(f"=== Round {i} ===============") - # Train - losses = [] - optimizer.zero_grad() - for i, (question, correct) in enumerate(csv_data.train_iter()): - answer = model(question) - loss = loss_fn(input_val=answer, truth=correct) - losses.append(loss) - # BUG: Any more than 2 samples and we run out of tokens - if i > 0 and i % BATCH_SIZE == 0: - total_loss = tg.sum(losses) - total_loss.backward() - optimizer.step() - losses = [] - optimizer.zero_grad() - - validator.evaluate(csv_data.validation_iter(), system_prompt, model, loss_fn) - print(f"current prompt: {system_prompt}") - print(f"========================") - print(validator.results) - - -if __name__ == "__main__": - main() diff --git a/examples/sas_conversion.py b/examples/sas_conversion.py deleted file mode 100644 index 98e1d3d..0000000 --- a/examples/sas_conversion.py +++ /dev/null @@ -1,48 +0,0 @@ -""" -Testing Conversion of SAS files to Python using system of language agents -""" -import logging -from argparse import ArgumentParser -from dotenv import load_dotenv - -from agents.agent import * -from agents.meta import ChunkedReflexion -load_dotenv() - -logger = logging.basicConfig(filename="sas_conversion.log", filemode="w", level=logging.INFO) - -if __name__ == "__main__": - parser = ArgumentParser( - prog="SAS -> Python Conversion via LLM agents", - description=""" - This program converts an input SAS file into a rough outline of a Python script that might replace the functionality. - It's unlikely that the resulting script will work out-of-the-box, but it should provide a basic structure to start with - """ - ) - parser.add_argument("-i", "--input", type=str, default="data/INPUT - GEN LAB 1 initial LOINC pull and data clean (5).sas", help="SAS file input path") - parser.add_argument("-o", "--output", type=str, default="out.py", help="Python file output path (default: out.py)") - parser.add_argument("-m", "--model", type=str, default="edav-chatapp-share-gpt4-32k-tpm25kplus-v0613-dfilter", help="Name of model deployment to use for language agents (default: edav-chatapp-share-gpt4-32k-tpm25kplus-v0613-dfilter)") - parser.add_argument("--device_code", action="store_true", help="Instead of using Service Principal creds, utilize Azure Device Code authorization flow for Azure OpenAI") - parser.add_argument("--reflection_rounds", type=int, default=2, help="Number of rounds of Reflexion to refine the output") - args = parser.parse_args() - - with open(args.input, "r") as file: - sas_file_content = file.read().strip() - - if args.device_code: - # TODO: Implement - pass - - reflection_pipeline = ChunkedReflexion( - sas_file_content, - actor=SAStoPythonAgent, - evaluator=PythonReflexionAgent, - model_name=args.model, - n_rounds=args.reflection_rounds, - chunk_max=1500 - ) - - refine_agent = PythonRefineAgent(reflection_pipeline(), args.model) - - with open(args.output, "w") as file: - file.write(refine_agent()) diff --git a/examples/structured_prediction.py b/examples/structured_prediction.py new file mode 100644 index 0000000..6a85a68 --- /dev/null +++ b/examples/structured_prediction.py @@ -0,0 +1,88 @@ +""" +Demo of structured returns and predictions using two agents: +- One QAGenerator agent which returns some questions to be answered along with their correct answers +- One QAnswerer agent, which attempts to answer the questions + +Sean Browning +""" +import asyncio +import agents +from pydantic import BaseModel, Field +from typing import List, Literal +import logging +import dotenv +import openai + +# NOTE: This loads in env vars for openAI +dotenv.load_dotenv() + +logger = logging.getLogger(__name__) + +logging.basicConfig(level=logging.INFO) + +class Answer(BaseModel): + """ + Response we expect from question answering agent + """ + answer: List[Literal["A", "B", "C", "D"]] = Field(description="Answer to the question") + +class QuestionandAnswer(Answer): + """ + The response body we expect question producing agent (Q + A) + """ + question: List[str] = Field(description="Question text that includes the question and response options, but NOT the answer") + + +class QAGenerator(agents.StructuredOutputAgent): + """ + A language agent which produces questions to be answered + """ + + SYSTEM_PROMPT = "You are a language agent proficient in crafting multiple choice questions" + BASE_PROMPT = """ + Write {n_questions} short multiple choice questions on a general topic and supply their correct answers. + + For each question, there should be 4 possible answer choices: A, B, C, and D. + + You will supply the question text and the answer as parameters to a function call. + The function call accepts an array for each argument, so pass all questions and answers in the same function call and ensure their indices align. + """ + +class QAnswerer(agents.StructuredOutputAgent): + SYSTEM_PROMPT = "You are a language agent proficient in answering multiple choice questions." + BASE_PROMPT = """ + Answer each multiple choice question using a function call. + The function call accepts an array for each argument, so pass all answers in the same function call and ensure their indices align. + + {questions} + """ + +if __name__ == "__main__": + # Run this with Interactive OAuth + agents.openai_creds_ad("Interactive") + + llm = openai.AsyncAzureOpenAI() + + ag = QAGenerator( + response_model=QuestionandAnswer, + model_name = "edav-api-share-gpt4-api-nofilter", + llm = llm, + oai_kwargs={"temperature": 1.0}, + n_questions=5 + ) + + asyncio.run(ag()) + + print(ag.answer) + + ag2 = QAnswerer( + response_model=Answer, + model_name = "edav-api-share-gpt4-api-nofilter", + llm = llm, + oai_kwargs={"temperature": 1.0}, + questions=ag.answer["question"] + ) + + asyncio.run(ag2()) + + print(ag2.answer) \ No newline at end of file diff --git a/examples/textgrad_test.py b/examples/textgrad_test.py deleted file mode 100644 index 3e297b8..0000000 --- a/examples/textgrad_test.py +++ /dev/null @@ -1,60 +0,0 @@ -import argparse -import os -import textgrad as tg -from textgrad.engine.openai import AzureChatOpenAI -from dotenv import load_dotenv -import agents -load_dotenv() - -# === Get creds, set env vars ================================= -agents.openai_creds_ad() -os.environ["AZURE_OPENAI_ENDPOINT"] = os.getenv("GPT4_URL") - -if __name__ == "__main__": - parser = argparse.ArgumentParser() - parser.add_argument("-n", "--n_trials", type=int, default=2) - parser.add_argument("-m", "--model", type=str, default="edav-api-share-gpt4-api-nofilter") - args = parser.parse_args() - - # === Set up engines ================= - eng = AzureChatOpenAI(args.model) - eng_backprop = AzureChatOpenAI(args.model) - tg.set_backward_engine(eng_backprop, override=True) - - model = tg.BlackboxLLM(eng) - - # === Run ==================================================== - question_string = ("If it takes 1 hour to dry 25 shirts under the sun, " - "how long will it take to dry 30 shirts under the sun? " - "Reason step by step") - - question = tg.Variable(question_string, - role_description="question to the LLM", - requires_grad=False) - - answer = model(question) - - answer.set_role_description("concise and accurate answer to the question") - - # Step 2: Define the loss function and the optimizer, just like in PyTorch! - # Here, we don't have SGD, but we have TGD (Textual Gradient Descent) - # that works with "textual gradients". - optimizer = tg.TGD(parameters=[answer]) - evaluation_instruction = (f"Here's a question: {question_string}. Here's the correct answer: 1 hour." - "Evaluate any given answer to this question, " - "be smart, logical, and very critical. " - "Just provide concise feedback.") - - - # TextLoss is a natural-language specified loss function that describes - # how we want to evaluate the reasoning. - loss_fn = tg.TextLoss(evaluation_instruction, engine=eng_backprop) - - for i in range(args.n_trials): - print(f"====Round {i + 1} Answer====\n\n{answer}") - - loss = loss_fn(answer) - loss.backward() - optimizer.step() - - print(f"\nLoss: {loss.value}\n\n") diff --git a/requirements.txt b/requirements.txt index 9d2fc88..31334ba 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,13 +1,7 @@ azure-core==1.30.1 azure-identity==1.16.0 backoff==2.2.1 -gymnasium==0.29.1 -langchain==0.2.1 -langchain-community==0.2.1 -langchain-core==0.2.3 -openai>=1.30.5 +openai>=1.40.0 polars>=1.6.0 pydantic>=2.9.2 -python-dotenv==1.0.1 -tiktoken==0.7.0 -wikipedia==1.4.0 +tqdm \ No newline at end of file diff --git a/setup.py b/setup.py index 08c3a27..9d60559 100644 --- a/setup.py +++ b/setup.py @@ -8,7 +8,7 @@ setup( name='agents', - version='0.2', + version='0.3.0-alpha.1', description='Yet another langchain-esque package to use language agents and compose agent-systems', long_description=readme, long_description_content_type="text/markdown", @@ -19,5 +19,7 @@ license='MIT', packages=find_packages(include=["agents", "agents.*"]), install_requires=requirements, - zip_safe=False + tests_require=[ + "pytest", + ] ) \ No newline at end of file