diff --git a/examples/PoT/README.md b/examples/PoT/README.md new file mode 100644 index 00000000..5c84fb2e --- /dev/null +++ b/examples/PoT/README.md @@ -0,0 +1,125 @@ +# Program of Thought (PoT) Example + +Program of Thought (PoT) is a novel approach that combines language models with code execution to solve complex reasoning tasks. It leverages the model's ability to generate executable code that breaks down problems into computational steps, providing a more structured and verifiable solution path compared to traditional methods. + +This example demonstrates how to use the framework for Program of Thought tasks. The example code can be found in the `examples/PoT` directory. + +```bash + cd examples/PoT +``` + +## Overview + +This example implements a Program of Thought (PoT) workflow that consists of the following components: + +1. **Input Interface** + - Handles user input containing questions + - Manages example prompts for few-shot learning + - Processes additional options for the workflow + +2. **PoT Workflow** + - Takes the user's question and example prompts + - Generates executable Python code to solve the problem + - Executes the code to obtain numerical answers + - Provides step-by-step reasoning through code + +3. **PoT Executor** + - Executes the generated Python code in a safe environment + - Returns the computed answer + +4. **Choice Extractor** + - Compares the executed result with available choices + - Returns the choice that most closely matches the result + - Handles numerical and textual comparison to find best match + +![DnC Workflow](./docs/images/pot_workflow.jpg) + +## Prerequisites + +- Python 3.10+ +- Required packages installed (see requirements.txt) +- Access to OpenAI API or compatible endpoint (see configs/llms/gpt.yml) +- Redis server running locally or remotely +- Conductor server running locally or remotely + +## Configuration + +The container.yaml file is a configuration file that manages dependencies and settings for different components of the system, including Conductor connections, Redis connections, and other service configurations. To set up your configuration: + +1. Generate the container.yaml file: + ```bash + python compile_container.py + ``` + This will create a container.yaml file with default settings under `examples/PoT`. + + +2. Configure your LLM settings in `configs/llms/gpt.yml`: + - Set your OpenAI API key or compatible endpoint through environment variable or by directly modifying the yml file + ```bash + export custom_openai_key="your_openai_api_key" + export custom_openai_endpoint="your_openai_endpoint" + export model_id="your_model_id" # e.g. gpt-4, gpt-3.5-turbo + ``` + - Configure other model settings like temperature as needed through environment variable or by directly modifying the yml file + +3. Update settings in the generated `container.yaml`: + - Modify Redis connection settings: + - Set the host, port and credentials for your Redis instance + - Configure `redis_stream_client` sections + - Update the Conductor server URL under conductor_config section + - Adjust any other component settings as needed + +4. [Optional] Prepare example prompts: + - Create a text file containing example math problems and their Python solutions + - Use the format shown in eval_gsm8k_fewshot.py + - Pass the file path using --examples when running evaluation + +## Running the Example + +3. Run the Program of Thought (PoT) example: + + For terminal/CLI usage: + ```bash + python run_cli.py + ``` + + For web interface usage: + ```bash + python run_webpage.py + ``` + For evaluating on GSM8K dataset with few-shot examples: + ```bash + python eval_gsm8k_fewshot.py \ + --endpoint "https://api.openai.com/v1" \ + --api_key "your_openai_api_key" \ + --model_id "gpt-3.5-turbo" \ + --dataset_path "gsm8k_test.jsonl" \ + --examples "examples.txt" \ # Optional: provide custom examples + --output_path "output" + ``` + + For evaluating on AQUA dataset with zero-shot learning: + ```bash + python eval_aqua_zeroshot.py \ + --endpoint "https://api.openai.com/v1" \ + --api_key "your_openai_api_key" \ + --model_id "gpt-3.5-turbo" \ + --dataset_path "aqua_test.jsonl" \ + --output_path "output" + ``` + + The evaluation scripts will: + - Process questions from the dataset using Program of Thought approach + - Generate Python code solutions for each question + - Save results to JSON files in the specified output directory + - Include metrics like prompt tokens and completion tokens + + +## Troubleshooting + +If you encounter issues: +- Verify Redis is running and accessible +- Check your OpenAI API key is valid +- Ensure all dependencies are installed correctly +- Review logs for any error messages +- **Open an issue on GitHub if you can't find a solution, we will do our best to help you out!** diff --git a/examples/PoT/__init__.py b/examples/PoT/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/examples/PoT/agent/input_interface/__init__.py b/examples/PoT/agent/input_interface/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/examples/PoT/agent/input_interface/input_interface.py b/examples/PoT/agent/input_interface/input_interface.py new file mode 100644 index 00000000..78bbea12 --- /dev/null +++ b/examples/PoT/agent/input_interface/input_interface.py @@ -0,0 +1,58 @@ +from pathlib import Path + +from omagent_core.utils.registry import registry +from omagent_core.utils.general import read_image +from omagent_core.engine.worker.base import BaseWorker +from omagent_core.utils.logger import logging + + +CURRENT_PATH = Path(__file__).parents[0] + + +@registry.register_worker() +class PoTInputInterface(BaseWorker): + """Input interface processor that handles user questions and example inputs. + + This processor manages the interactive input collection process for math problem solving: + 1. Collects a math word problem from the user + 2. Optionally collects example problems/solutions for few-shot learning + 3. Optionally collects multiple choice options if applicable + 4. Validates and formats all inputs appropriately + 5. Returns a structured dictionary containing the processed inputs + + The interface is designed to be flexible, allowing both basic question-only + inputs as well as more complex scenarios with examples and multiple choice options. + """ + + def _run(self, *args, **kwargs): + # Prompt user for the main math question and extract text content + input = self.input.read_input(workflow_instance_id=self.workflow_instance_id, input_prompt='Please input a math related question:') + content = input['messages'][-1]['content'] + for content_item in content: + if content_item['type'] == 'text': + query = content_item['data'] + + # Collect optional example problems/solutions for few-shot learning + # User can input "None" to skip this step + input = self.input.read_input(workflow_instance_id=self.workflow_instance_id, input_prompt='Please input examples if you have, input "None" if you do not have:') + content = input['messages'][-1]['content'] + for content_item in content: + if content_item['type'] == 'text': + examples = content_item['data'] + if examples == 'None': + examples = None + + # Collect optional multiple choice options if this is a multiple choice question + # User can input "None" for standard numerical answer questions + input = self.input.read_input(workflow_instance_id=self.workflow_instance_id, input_prompt='Please input options if you are doing a multiple choice question, input "None" if you do not have:') + content = input['messages'][-1]['content'] + for content_item in content: + if content_item['type'] == 'text': + options = content_item['data'] + if options == 'None': + options = None + + # Return all collected inputs in a structured format + inputs = {'query': query, 'examples': examples, 'options': options} + logging.info(inputs) + return inputs diff --git a/examples/PoT/compile_container.py b/examples/PoT/compile_container.py new file mode 100644 index 00000000..6d77e81c --- /dev/null +++ b/examples/PoT/compile_container.py @@ -0,0 +1,19 @@ +# Import core modules and components +from omagent_core.utils.container import container + +# Import workflow related modules +from pathlib import Path +from omagent_core.utils.registry import registry + +# Set up path and import modules +CURRENT_PATH = root_path = Path(__file__).parents[0] +registry.import_module() + +# Register required components +container.register_callback(callback='AppCallback') +container.register_input(input='AppInput') +# Compile container config +container.compile_config(CURRENT_PATH) + + + diff --git a/examples/PoT/configs/llms/gpt.yml b/examples/PoT/configs/llms/gpt.yml new file mode 100644 index 00000000..6c86d9eb --- /dev/null +++ b/examples/PoT/configs/llms/gpt.yml @@ -0,0 +1,7 @@ +name: OpenaiGPTLLM +model_id: ${env| model_id, gpt-3.5-turbo} +api_key: ${env| custom_openai_key, openai_api_key} +endpoint: ${env| custom_openai_endpoint, https://api.openai.com/v1} +temperature: 0 +use_default_sys_prompt: false +vision: false \ No newline at end of file diff --git a/examples/PoT/configs/workers/PoT_workflow.yml b/examples/PoT/configs/workers/PoT_workflow.yml new file mode 100644 index 00000000..f258bc87 --- /dev/null +++ b/examples/PoT/configs/workers/PoT_workflow.yml @@ -0,0 +1,5 @@ +- name: PoTExecutor + llm: ${sub| gpt} +- name: ChoiceExtractor + llm: ${sub| gpt} +- name: PoTInputInterface \ No newline at end of file diff --git a/examples/PoT/docs/images/pot_workflow.jpg b/examples/PoT/docs/images/pot_workflow.jpg new file mode 100644 index 00000000..2c0625bf --- /dev/null +++ b/examples/PoT/docs/images/pot_workflow.jpg @@ -0,0 +1,3 @@ +version https://git-lfs.github.com/spec/v1 +oid sha256:fe844f063ec2f831cfb9a4b20fa274fd9f171ee10844630659116b072c98f458 +size 42124 diff --git a/examples/PoT/eval_aqua_zeroshot.py b/examples/PoT/eval_aqua_zeroshot.py new file mode 100644 index 00000000..b484bd3f --- /dev/null +++ b/examples/PoT/eval_aqua_zeroshot.py @@ -0,0 +1,111 @@ +# Import required modules and components +from omagent_core.utils.container import container +from omagent_core.engine.workflow.conductor_workflow import ConductorWorkflow +from omagent_core.advanced_components.workflow.pot.workflow import PoTWorkflow +from pathlib import Path +from omagent_core.utils.registry import registry +from omagent_core.clients.devices.programmatic.client import ProgrammaticClient +from omagent_core.utils.logger import logging +import argparse +import json +import os + + +def parse_args(): + """Parse command line arguments for AQUA evaluation""" + parser = argparse.ArgumentParser(description='Evaluate AQUA dataset using Program of Thought') + parser.add_argument('--endpoint', type=str, default="https://api.openai.com/v1", + help='OpenAI API endpoint') + parser.add_argument('--api_key', type=str, default=None, + help='OpenAI API key') + parser.add_argument('--model_id', type=str, default="gpt-3.5-turbo", + help='Model ID to use') + parser.add_argument('--dataset_path', type=str, default="aqua_test.jsonl", + help='Path to dataset') + parser.add_argument('--output_path', type=str, default='output', + help='Path to output file. If not provided, will use default') + return parser.parse_args() + +def main(): + """Main function to run AQUA evaluation""" + # Parse command line arguments + args = parse_args() + + # Set environment variables for API + os.environ["custom_openai_endpoint"] = args.endpoint + os.environ["custom_openai_key"] = args.api_key + os.environ["model_id"] = args.model_id + + # Load dataset and setup variables + dataset_path = args.dataset_path + model_id = args.model_id + dataset_name = 'aqua' + + # Read dataset from JSONL file + datasets = [] + with open(dataset_path, 'r') as f: + for line in f: + datasets.append(json.loads(line)) + + # Setup logging and paths + logging.init_logger("omagent", "omagent", level="INFO") + CURRENT_PATH = Path(__file__).parents[0] + + # Initialize agent modules and configuration + registry.import_module(project_path=CURRENT_PATH.joinpath('agent')) + container.from_config(CURRENT_PATH.joinpath('container.yaml')) + + # Setup Program of Thought workflow + workflow = ConductorWorkflow(name='PoT') + pot_workflow = PoTWorkflow() + pot_workflow.set_input(query=workflow.input('query'), examples=workflow.input('examples'), options=workflow.input('options')) + workflow >> pot_workflow + workflow.register(overwrite=True) + + # Initialize programmatic client + config_path = CURRENT_PATH.joinpath('configs') + programmatic_client = ProgrammaticClient(processor=workflow, config_path=config_path) + + # Prepare batch processing inputs + output_json = [] + workflow_input_list = [] + for question in datasets: + workflow_input_list.append({ + "id": question['id'], + "query": question['question'], + "examples": None, + "options": str(question['options']) + }) + + # Process questions in batches + res = programmatic_client.start_batch_processor(workflow_input_list=workflow_input_list, max_tasks=5) + + # Collect results + for r, w in zip(res, workflow_input_list): + output_json.append({ + "id": w['id'], + "question": w['query'], + "last_output": r['last_output'], + "prompt_tokens": r['prompt_tokens'], + "completion_tokens": r['completion_tokens'] + }) + + # Prepare final output + final_output = { + "dataset_name": dataset_name, + "model_id": model_id, + "alg": "POT", + "model_result": output_json + } + + # Save results to output file + if not os.path.exists(args.output_path): + os.makedirs(args.output_path) + with open(f'{args.output_path}/{dataset_name}_{model_id}_POT_output.json', 'w') as f: + json.dump(final_output, f, indent=4) + + # Cleanup + programmatic_client.stop_processor() + +if __name__ == "__main__": + main() diff --git a/examples/PoT/eval_gsm8k_fewshot.py b/examples/PoT/eval_gsm8k_fewshot.py new file mode 100644 index 00000000..27a7453d --- /dev/null +++ b/examples/PoT/eval_gsm8k_fewshot.py @@ -0,0 +1,174 @@ +# Import required modules and components +from omagent_core.utils.container import container +from omagent_core.engine.workflow.conductor_workflow import ConductorWorkflow +from omagent_core.advanced_components.workflow.pot.workflow import PoTWorkflow +from pathlib import Path +from omagent_core.utils.registry import registry +from omagent_core.clients.devices.programmatic.client import ProgrammaticClient +from omagent_core.utils.logger import logging +import argparse +import json +import os + + +def parse_args(): + """Parse command line arguments for GSM8K evaluation""" + parser = argparse.ArgumentParser(description='Evaluate GSM8K dataset using Program of Thought') + parser.add_argument('--endpoint', type=str, default="https://api.openai.com/v1", + help='OpenAI API endpoint') + parser.add_argument('--api_key', type=str, default=None, + help='OpenAI API key') + parser.add_argument('--model_id', type=str, default="gpt-3.5-turbo", + help='Model ID to use') + parser.add_argument('--dataset_path', type=str, default="gsm8k_test.jsonl", + help='Path to dataset') + parser.add_argument('--examples', type=str, default=None, + help='Path to examples file. If not provided, will use default examples') + parser.add_argument('--output_path', type=str, default='output', + help='Path to output file. If not provided, will use default') + return parser.parse_args() + +def main(): + """Main function to run GSM8K evaluation""" + # Parse command line arguments + args = parse_args() + + # Set environment variables for OpenAI API + os.environ["custom_openai_endpoint"] = args.endpoint + os.environ["custom_openai_key"] = args.api_key + os.environ["model_id"] = args.model_id + + # Load examples for few-shot learning + if args.examples: + with open(args.examples) as f: + ex = f.read() + else: + # Default examples if none provided - contains simple arithmetic problems + ex = ''' +Question: There are 15 trees in the grove. Grove workers will plant trees in the grove today. After they are done, there will be 21 trees. How many trees did the grove workers plant today? +# Python code, return ans +total_trees = 15 +after_planted_trees = 21 +ans = after_planted_trees - total_trees + +Question: If there are 3 cars in the parking lot and 2 more cars arrive, how many cars are in the parking lot? +# Python code, return ans +total_cars = 3 +more_arrived_cars = 2 +ans = total_cars + more_arrived_cars + +Question: Leah had 32 chocolates and her sister had 42. If they ate 35, how many pieces do they have left in total? +# Python code, return ans +num_of_Leah_chocolates = 32 +num_of_sister_chocolates = 42 +total_chocolates = num_of_Leah_chocolates + num_of_sister_chocolates +eaten_chocolates = 35 +ans = total_chocolates - eaten_chocolates + +Question: Jason had 20 lollipops. He gave Denny some lollipops. Now Jason has 12 lollipops. How many lollipops did Jason give to Denny? +# Python code, return ans +num_of_Jason_lollipops = 20 +num_of_given_lollipops = 12 +ans = num_of_Jason_lollipops - num_of_given_lollipops + +Question: Shawn has five toys. For Christmas, he got two toys each from his mom and dad. How many toys does he have now? +# Python code, return ans +num_of_Shawn_toys = 5 +num_of_toys_from_mom = 2 +num_of_toys_from_dad = 2 +ans = num_of_Shawn_toys + num_of_toys_from_mom + num_of_toys_from_dad + +Question: There were nine computers in the server room. Five more computers were installed each day, from monday to thursday. How many computers are now in the server room? +# Python code, return ans +num_of_computers_in_server_room = 9 +num_of_computers_installed_each_day = 5 +num_of_days = 4 +ans = num_of_computers_in_server_room + num_of_computers_installed_each_day * num_of_days + +Question: Michael had 58 golf balls. On tuesday, he lost 23 golf balls. On wednesday, he lost 2 more. How many golf balls did he have at the end of wednesday? +# Python code, return ans +num_of_Michael_golf_balls = 58 +num_of_golf_balls_lost_on_tuesday = 23 +num_of_golf_balls_lost_on_wednesday = 2 +ans = num_of_Michael_golf_balls - num_of_golf_balls_lost_on_tuesday - num_of_golf_balls_lost_on_wednesday + +Question: Olivia has $23. She bought five bagels for $3 each. How much money does she have left? +# Python code, return ans +num_of_Olivia_money = 23 +num_of_bagels = 5 +cost_of_each_bagel = 3 +ans = num_of_Olivia_money - num_of_bagels * cost_of_each_bagel +''' + + # Load dataset and setup variables + dataset_path = args.dataset_path + model_id = args.model_id + dataset_name = 'gsm8k' + + # Read dataset from JSONL file + datasets = [] + with open(dataset_path, 'r') as f: + for line in f: + datasets.append(json.loads(line)) + + # Setup logging and paths + logging.init_logger("omagent", "omagent", level="INFO") + CURRENT_PATH = Path(__file__).parents[0] + + # Initialize agent modules and configuration + registry.import_module(project_path=CURRENT_PATH.joinpath('agent')) + container.from_config(CURRENT_PATH.joinpath('container.yaml')) + + # Setup Program of Thought workflow + workflow = ConductorWorkflow(name='PoT') + pot_workflow = PoTWorkflow() + pot_workflow.set_input(query=workflow.input('query'), examples=workflow.input('examples')) + workflow >> pot_workflow + workflow.register(overwrite=True) + + # Initialize programmatic client + config_path = CURRENT_PATH.joinpath('configs') + programmatic_client = ProgrammaticClient(processor=workflow, config_path=config_path) + + # Prepare batch processing inputs + output_json = [] + workflow_input_list = [] + for question in datasets[:10]: + workflow_input_list.append({ + "id": question['id'], + "query": question['question'], + "examples": ex + }) + + # Process questions in batches + res = programmatic_client.start_batch_processor(workflow_input_list=workflow_input_list, max_tasks=5) + + # Collect results + for r, w in zip(res, workflow_input_list): + output_json.append({ + "id": w['id'], + "question": w['query'], + "last_output": r['last_output'], + "prompt_tokens": r['prompt_tokens'], + "completion_tokens": r['completion_tokens'] + }) + + # Prepare final output + final_output = { + "dataset_name": dataset_name, + "model_id": model_id, + "alg": "POT", + "model_result": output_json + } + + # Save results to output file + if not os.path.exists(args.output_path): + os.makedirs(args.output_path) + with open(f'{args.output_path}/{dataset_name}_{model_id}_POT_output.json', 'w') as f: + json.dump(final_output, f, indent=4) + + # Cleanup + programmatic_client.stop_processor() + +if __name__ == "__main__": + main() diff --git a/examples/PoT/run_cli.py b/examples/PoT/run_cli.py new file mode 100644 index 00000000..672822f9 --- /dev/null +++ b/examples/PoT/run_cli.py @@ -0,0 +1,45 @@ +# Import core modules and components for the Program of Thought (PoT) workflow +from omagent_core.utils.container import container +from omagent_core.engine.workflow.conductor_workflow import ConductorWorkflow +from omagent_core.engine.workflow.task.simple_task import simple_task +from agent.input_interface.input_interface import PoTInputInterface +from omagent_core.advanced_components.workflow.pot.workflow import PoTWorkflow +from pathlib import Path +from omagent_core.utils.registry import registry +from omagent_core.clients.devices.cli.client import DefaultClient +from omagent_core.utils.logger import logging + + +# Initialize logging with INFO level +logging.init_logger("omagent", "omagent", level="INFO") + +# Get the root directory path +CURRENT_PATH = Path(__file__).parents[0] + +# Load custom agent modules from the project directory +registry.import_module(project_path=CURRENT_PATH.joinpath('agent')) + +# Load container configuration from YAML file +container.from_config(CURRENT_PATH.joinpath('container.yaml')) + + +# Initialize the main Program of Thought workflow +workflow = ConductorWorkflow(name='PoT') + +# Create input interface task to handle user interactions +client_input_task = simple_task(task_def_name=PoTInputInterface, task_reference_name='PoT_input_interface') + +# Initialize PoT workflow and connect it with input task outputs +pot_workflow = PoTWorkflow() +pot_workflow.set_input(query=client_input_task.output('query'), examples=client_input_task.output('examples'), options=client_input_task.output('options')) + +# Chain tasks together: Input Interface -> PoT Workflow +workflow >> client_input_task >> pot_workflow + +# Register workflow with overwrite option enabled +workflow.register(overwrite=True) + +# Initialize and start CLI client with configured workflow +config_path = CURRENT_PATH.joinpath('configs') +cli_client = DefaultClient(interactor=workflow, config_path=config_path, workers=[PoTInputInterface()]) +cli_client.start_interactor() diff --git a/examples/PoT/run_programmatic.py b/examples/PoT/run_programmatic.py new file mode 100644 index 00000000..3f667ca7 --- /dev/null +++ b/examples/PoT/run_programmatic.py @@ -0,0 +1,46 @@ +# Import required modules and components +from omagent_core.utils.container import container +from omagent_core.engine.workflow.conductor_workflow import ConductorWorkflow +from omagent_core.advanced_components.workflow.pot.workflow import PoTWorkflow +from pathlib import Path +from omagent_core.utils.registry import registry +from omagent_core.clients.devices.programmatic.client import ProgrammaticClient +from omagent_core.utils.logger import logging + + +# Initialize logging with INFO level +logging.init_logger("omagent", "omagent", level="INFO") + +# Get the root directory path +CURRENT_PATH = Path(__file__).parents[0] + +# Load custom agent modules from the project directory +registry.import_module(project_path=CURRENT_PATH.joinpath('agent')) + +# Load container configuration from YAML file +container.from_config(CURRENT_PATH.joinpath('container.yaml')) + + +# Initialize the main Program of Thought workflow +workflow = ConductorWorkflow(name='PoT') +pot_workflow = PoTWorkflow() +pot_workflow.set_input(query=workflow.input('query'), examples=workflow.input('examples')) +workflow >> pot_workflow +workflow.register(overwrite=True) + +# Initialize programmatic client +config_path = CURRENT_PATH.joinpath('configs') +programmatic_client = ProgrammaticClient(processor=workflow, config_path=config_path) + +# Prepare batch processing inputs +workflow_input_list = [ + {"query": "Tom gets 4 car washes a month. If each car wash costs $15 how much does he pay in a year?", "examples": None, "options": None} +] + +# Process questions in batches +res = programmatic_client.start_batch_processor(workflow_input_list=workflow_input_list, max_tasks=5) + +print(res) + +# Cleanup +programmatic_client.stop_processor() \ No newline at end of file diff --git a/examples/PoT/run_webpage.py b/examples/PoT/run_webpage.py new file mode 100644 index 00000000..391dbe75 --- /dev/null +++ b/examples/PoT/run_webpage.py @@ -0,0 +1,45 @@ +# Import core modules and components for the Program of Thought (PoT) workflow +from omagent_core.utils.container import container +from omagent_core.engine.workflow.conductor_workflow import ConductorWorkflow +from omagent_core.engine.workflow.task.simple_task import simple_task +from agent.input_interface.input_interface import PoTInputInterface +from omagent_core.advanced_components.workflow.pot.workflow import PoTWorkflow +from pathlib import Path +from omagent_core.utils.registry import registry +from omagent_core.clients.devices.webpage.client import WebpageClient +from omagent_core.utils.logger import logging + + +# Initialize logging with INFO level +logging.init_logger("omagent", "omagent", level="INFO") + +# Get the root directory path +CURRENT_PATH = Path(__file__).parents[0] + +# Load custom agent modules from the project directory +registry.import_module(project_path=CURRENT_PATH.joinpath('agent')) + +# Load container configuration from YAML file +container.from_config(CURRENT_PATH.joinpath('container.yaml')) + + +# Initialize the main Program of Thought workflow +workflow = ConductorWorkflow(name='PoT') + +# Create input interface task to handle user interactions +client_input_task = simple_task(task_def_name=PoTInputInterface, task_reference_name='PoT_input_interface') + +# Initialize PoT workflow and connect it with input task outputs +pot_workflow = PoTWorkflow() +pot_workflow.set_input(query=client_input_task.output('query'), examples=client_input_task.output('examples'), options=client_input_task.output('options')) + +# Chain tasks together: Input Interface -> PoT Workflow +workflow >> client_input_task >> pot_workflow + +# Register workflow with overwrite option enabled +workflow.register(overwrite=True) + +# Initialize and start CLI client with configured workflow +config_path = CURRENT_PATH.joinpath('configs') +cli_client = WebpageClient(interactor=workflow, config_path=config_path, workers=[PoTInputInterface()]) +cli_client.start_interactor() diff --git a/omagent-core/src/omagent_core/advanced_components/workflow/pot/README.md b/omagent-core/src/omagent_core/advanced_components/workflow/pot/README.md new file mode 100644 index 00000000..bc75858c --- /dev/null +++ b/omagent-core/src/omagent_core/advanced_components/workflow/pot/README.md @@ -0,0 +1,35 @@ +# Program-of-Thought (PoT) Operator +Program-of-Thought (PoT) is a workflow operator that solves math word problems by generating and executing Python code. It consists of two main components: + +1. A PoT Executor that uses an LLM to generate Python code implementing the solution steps, safely executes the code in an isolated environment, and extracts the numerical answer. + +2. A Choice Extractor that processes multiple choice questions by analyzing the generated answer against provided options using an LLM. + +You can refer to the examples in the `examples/PoT` directory to understand how to use this operator. + +# Inputs, Outputs and Configs + +## Inputs: +The inputs that the Program-of-Thought (PoT) operator requires are as follows: +| Name | Type | Required | Description | +| -------- | ----- | ----- | ---- | +| query | str | true | The math word problem text to solve | +| examples | str | false | Few-shot examples to guide code generation. If provided, uses a specialized few-shot prompt template | +| options | str | false | Multiple choice options. If provided, triggers the Choice Extractor to analyze the answer against these options | + +## Outputs: +The outputs that the Program-of-Thought (PoT) operator returns are as follows: +| Name | Type | Description | +| -------- | ----- | ---- | +| last_output | Union[float,str] | For regular problems: numerical answer as float. For multiple choice: selected option as string | +| completion_tokens | int | Cumulative number of tokens in LLM completions | +| prompt_tokens | int | Cumulative number of tokens in LLM prompts | + +## Configs: +The config of the Program-of-Thought (PoT) operator is as follows, you can simply copy and paste the following config into your project as a pot_workflow.yml file. +```yml +- name: PoTExecutor + llm: ${sub| gpt} +- name: ChoiceExtractor + llm: ${sub| gpt} +``` \ No newline at end of file diff --git a/omagent-core/src/omagent_core/advanced_components/workflow/pot/__init__.py b/omagent-core/src/omagent_core/advanced_components/workflow/pot/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/omagent-core/src/omagent_core/advanced_components/workflow/pot/agent/choice_extractor/choice_extractor.py b/omagent-core/src/omagent_core/advanced_components/workflow/pot/agent/choice_extractor/choice_extractor.py new file mode 100644 index 00000000..7310a108 --- /dev/null +++ b/omagent-core/src/omagent_core/advanced_components/workflow/pot/agent/choice_extractor/choice_extractor.py @@ -0,0 +1,92 @@ +from pathlib import Path +from omagent_core.engine.worker.base import BaseWorker +from omagent_core.models.llms.base import BaseLLMBackend +from omagent_core.utils.registry import registry +from omagent_core.models.llms.openai_gpt import OpenaiGPTLLM +from omagent_core.models.llms.prompt.prompt import PromptTemplate +from pydantic import Field +from typing import List +from omagent_core.utils.logger import logging + + +# Get absolute path to the directory containing this file +CURRENT_PATH = root_path = Path(__file__).parents[0] + + +@registry.register_worker() +class ChoiceExtractor(BaseWorker, BaseLLMBackend): + """Choice extractor that processes multiple choice questions using LLM. + + This processor analyzes multiple choice questions and extracts the most likely answer: + 1. Takes a multiple choice question and its options as input + 2. Uses LLM to analyze the question and provided answer choices + 3. Returns the selected answer choice with confidence score + 4. Tracks token usage for monitoring purposes + + Attributes: + llm (OpenaiGPTLLM): The OpenAI GPT model used for answer analysis + prompts (List[PromptTemplate]): List of prompt templates used for LLM interaction. + Defaults to using the basic user prompt template. + """ + llm: OpenaiGPTLLM + prompts: List[PromptTemplate] = Field( + default=[ + PromptTemplate.from_file( + CURRENT_PATH.joinpath("user_prompt.prompt"), role="user" + ), + ] + ) + + def _run(self, query: str, prediction: str = None, completion_tokens: int = 0, prompt_tokens: int = 0, examples: str = None, options: str = None, *args, **kwargs): + """Processes a multiple choice question and extracts the most likely answer. + + This method coordinates the answer extraction process: + 1. Validates input parameters + 2. If no options provided, returns existing prediction + 3. Otherwise uses LLM to analyze question and options + 4. Tracks and accumulates token usage + 5. Returns selected answer with usage metrics + + Args: + query (str): The multiple choice question text + prediction (str, optional): Existing prediction to return if no options + completion_tokens (int): Running count of completion tokens used + prompt_tokens (int): Running count of prompt tokens used + examples (str, optional): Few-shot examples to guide answer selection + options (list, optional): List of possible answer choices + *args: Variable length argument list + **kwargs: Arbitrary keyword arguments + + Returns: + dict: A dictionary containing: + - 'last_output': The selected answer choice + - 'completion_tokens': Updated completion token count + - 'prompt_tokens': Updated prompt token count + + Note: + - Handles both direct return of predictions and LLM-based analysis + - Accumulates token counts across multiple LLM calls + - Logs extracted choices and usage metrics for monitoring + """ + if query == '' or query is None: + answer = {'last_output': None, 'completion_tokens': 0, 'prompt_tokens': 0} + + if options is None: + answer = {'last_output': prediction, 'completion_tokens': completion_tokens, 'prompt_tokens': prompt_tokens} + else: + chat_complete_res = self.simple_infer(question=query, options=options, prediction=prediction) + + # Extract selected answer choice from LLM response + result = chat_complete_res["choices"][0]["message"]["content"] + + logging.info('extracted choice: {}'.format(result)) + logging.info(chat_complete_res['usage']) + + # Accumulate token usage metrics + completion_tokens += chat_complete_res['usage']['completion_tokens'] + prompt_tokens += chat_complete_res['usage']['prompt_tokens'] + + answer = {'last_output': result, 'completion_tokens': completion_tokens, 'prompt_tokens': prompt_tokens} + + self.callback.send_answer(self.workflow_instance_id, msg=answer, filter_special_symbols=False) + return answer \ No newline at end of file diff --git a/omagent-core/src/omagent_core/advanced_components/workflow/pot/agent/choice_extractor/user_prompt.prompt b/omagent-core/src/omagent_core/advanced_components/workflow/pot/agent/choice_extractor/user_prompt.prompt new file mode 100644 index 00000000..05ded836 --- /dev/null +++ b/omagent-core/src/omagent_core/advanced_components/workflow/pot/agent/choice_extractor/user_prompt.prompt @@ -0,0 +1,37 @@ + +Find the closest options based on the question and prediction, only return the option letter. + +Question: A company produces 420 units of a particular computer component every month, at a production cost to the company of $110 per component, and sells all of the components by the end of each month. What is the minimum selling price per component that will guarantee that the yearly profit (revenue from sales minus production costs) will be at least $626,400 ? +Options: ['A)226', 'B)230', 'C)240', 'D)260', 'E)280'] +Prediction: 234.28571428571428 +Closest Option: B + +Question: In how many ways can the letters of the word "PROBLEC" be rearranged to make 7 letter words such that none of the letters repeat? +Options: ['A)2!', 'B)3!', 'C)7!', 'D)8!', 'E)9!'] +Prediction: 5040 +Closest Option: C + +Question: An exam is given in a certain class. The average (arithmetic mean) of the highest score and the lowest score is equal to x. If the average score for the entire class is equal to y and there are z students in the class, where z > 5, then in terms of x, y, and z, what is the average score for the class excluding the highest and lowest scorers? +Options: ['A)(zy – 2x)/z', 'B)(zy – 2)/z', 'C)(zx – y)/(z – 2)', 'D)(zy – 2x)/(z -2)', 'E)(zy – x)/(z + 2)'] +Prediction: (-2*x + y*z)/(z - 2) +Closest Option: D + +Question: Find the total no. of distinct bike no.'s that can beformed using 2 letters followed by 2 no.'s. How many letters need to be distinct? +Options: ["A)74453", "B)64543", "C)74325", "D)65000", "E)97656"] +Prediction = 67600 +Closest Option: D + +Question: A wire in the shape of rectangle of length 27 cm and breadth 17 cm is rebent to form a square. What will be the measure of each side? +Options: ['A)9', 'B)11', 'C)22', 'D)25', 'E)31'] +Prediction = [-21.42428528562855, 21.42428528562855] +Closest Option: C + +Question: A point on the edge of a fan blade that is rotating in a plane 10 centimeters from the center of the fan. What is the distance traveled, in centimeters, by this point after 30 seconds when the fan runs at the rate of 300 revolutions per minutes? +Options: ['A)750pi', 'B)1500pi', 'C)1875pi', 'D)3000pi', 'E)7500pi'] +Prediction: 9424.77 +Closest Option: D + +Question: {{question}} +Options: {{options}} +Prediction: {{prediction}} +Closest Option: \ No newline at end of file diff --git a/omagent-core/src/omagent_core/advanced_components/workflow/pot/agent/executor/PoTExecutor.py b/omagent-core/src/omagent_core/advanced_components/workflow/pot/agent/executor/PoTExecutor.py new file mode 100644 index 00000000..b738c859 --- /dev/null +++ b/omagent-core/src/omagent_core/advanced_components/workflow/pot/agent/executor/PoTExecutor.py @@ -0,0 +1,277 @@ +from pathlib import Path +from omagent_core.engine.worker.base import BaseWorker +from omagent_core.models.llms.base import BaseLLMBackend +from omagent_core.utils.registry import registry +from omagent_core.models.llms.openai_gpt import OpenaiGPTLLM +from omagent_core.models.llms.prompt.prompt import PromptTemplate +from pydantic import Field +from typing import List +from omagent_core.utils.logger import logging +import func_timeout +import math + +# Get absolute path to the directory containing this file +CURRENT_PATH = root_path = Path(__file__).parents[0] + + +@registry.register_worker() +class PoTExecutor(BaseWorker, BaseLLMBackend): + """Program-of-Thought (PoT) executor that solves math word problems step by step. + + This executor takes a natural language math problem as input, uses a large language model (LLM) + to generate Python code that solves the problem, safely executes the generated code in an + isolated environment, and returns the numerical answer. + + The workflow consists of: + 1. Receiving a math word problem text input + 2. Using an LLM (OpenAI GPT) to generate Python code that solves the problem + 3. Safely executing the generated code with timeouts and error handling + 4. Processing and returning the numerical answer + + Attributes: + llm (OpenaiGPTLLM): The OpenAI GPT model used for code generation + prompts (List[PromptTemplate]): List of prompt templates used for LLM interaction. + Defaults to using the basic user prompt template. + """ + llm: OpenaiGPTLLM + prompts: List[PromptTemplate] = Field( + default=[ + PromptTemplate.from_file( + CURRENT_PATH.joinpath("user_prompt.prompt"), role="user" + ), + ] + ) + + def floatify_ans(self, ans): + """Attempts to convert an answer to float format when possible. + + This method handles various input types and tries to convert them to a float + representation while preserving the semantic meaning of the answer. + + Args: + ans: Answer in any format (dict, bool, list, tuple, etc.) + + Returns: + float or str: The converted answer as a float when possible, otherwise as a string. + Returns None if input is None or an empty list/tuple. + + Examples: + >>> floatify_ans({'result': 42}) + 42.0 + >>> floatify_ans(True) + True + >>> floatify_ans([3.14]) + 3.14 + """ + if ans is None: + return None + elif type(ans) == dict: + # For dictionaries, extract the first value + ans = list(ans.values())[0] + elif type(ans) == bool: + # Preserve boolean values without conversion + ans = ans + elif type(ans) in [list, tuple]: + if not ans: + return None + else: + # For sequences, try to convert first element to float + try: + ans = float(ans[0]) + except Exception: + ans = str(ans[0]) + else: + # For all other types, attempt float conversion + try: + ans = float(ans) + except Exception: + ans = str(ans) + return ans + + def safe_execute(self, code_string: str, keys=None): + """Safely executes generated Python code with timeout protection. + + Provides a sandboxed environment for executing potentially unsafe code + with a timeout mechanism to prevent infinite loops or long-running code. + + Args: + code_string (str): Python code to execute + keys (List[str], optional): List of variable names to extract from locals() + + Returns: + Any: The execution result or None if execution fails/times out. + If keys are provided, returns a list of values for those keys from locals(). + + Note: + - Code execution is limited to 5 seconds + - Automatically handles markdown code block formatting + - Catches and logs all execution exceptions + """ + def execute(x): + try: + exec(x) + locals_ = locals() + if keys is None: + return locals_.get('ans', None) + else: + return [locals_.get(k, None) for k in keys] + except Exception as e: + logging.info("Execution error: error message {}, code_string {}".format(e, code_string)) + return None + try: + # Clean up markdown code formatting + if code_string.startswith('```python'): + code_string = code_string[9:] + if code_string.endswith('ans\n```'): + code_string = code_string[:-7] + if code_string.endswith('```'): + code_string = code_string[:-3] + # Execute with 5 second timeout + ans = func_timeout.func_timeout(5, execute, args=(code_string,)) + except func_timeout.FunctionTimedOut: + ans = None + + return ans + + def simplify_ans(self, ans, convert_to_str: bool = True): + """Simplifies and normalizes answer formats to consistent representations. + + Handles various numeric types including numpy arrays, sympy expressions, + and other mathematical objects, converting them to simple float or string format. + + Args: + ans: Answer in any format (numpy array, sympy expression, etc.) + convert_to_str (bool): Whether to convert the final result to string + + Returns: + Union[float, str, None]: Simplified answer in float or string format. + Returns None if input is falsy. + + Note: + - Rounds floating point numbers to 2 decimal places + - Handles special cases for numpy arrays and sympy expressions + - Preserves relational expressions as strings + """ + if 'relational' in str(type(ans)): + # Preserve relational expressions as strings + return str(ans) + elif 'numpy' in str(type(ans)): + if ans.shape == (): + # Handle scalar numpy value + ans = round(float(ans), 2) + else: + # Handle array numpy value - take first element + ans = round(float(ans[0]), 2) + if convert_to_str: + return str(ans) + else: + return ans + elif not ans: + return None + else: + if type(ans) in [list, tuple]: + # Handle sympy expressions in lists/tuples + if 'sympy' in str(type(ans[0])): + try: + ans = [round(float(x), 2) for x in ans] + except Exception: + ans = [str(x) for x in ans] + if len(ans) == 1: + ans = ans[0] + else: + # Handle single sympy expression + if 'sympy' in str(type(ans)): + try: + ans = round(float(ans), 2) + except Exception: + ans = str(ans) + if convert_to_str: + return str(ans) + else: + return ans + + def _run(self, query: str, examples: str = None, options: str = None, *args, **kwargs): + """Processes a math word problem and returns the numerical answer. + + This is the main execution method that coordinates the entire solution process: + 1. Validates input and selects appropriate prompt template + 2. Generates Python code using LLM + 3. Safely executes the code + 4. Processes and returns the result + + Args: + query (str): The math word problem text to solve + examples (str, optional): Few-shot examples to guide code generation + options (list, optional): Additional processing options + *args: Variable length argument list + **kwargs: Arbitrary keyword arguments + + Returns: + dict: A dictionary containing: + - 'last_output': The numerical answer + - 'completion_tokens': Number of tokens in the completion + - 'prompt_tokens': Number of tokens in the prompt + - 'body': Response body + + Note: + - Handles both zero-shot and few-shot scenarios + - Automatically adds necessary imports for zero-shot cases + - Logs all major steps for debugging + """ + logging.info("input query: {}".format(query)) + if query == '' or query is None: + return {'last_output': None, 'total_tokens': 0} + + # Select appropriate prompt template based on whether examples are provided + if examples is None: + # Use standard prompt for zero-shot + self.prompts = [ + PromptTemplate.from_file( + CURRENT_PATH.joinpath("user_prompt.prompt"), role="user" + ), + ] + chat_complete_res = self.simple_infer(question=query) + else: + # Use few-shot prompt with examples + self.prompts = [ + PromptTemplate.from_file( + CURRENT_PATH.joinpath("user_prompt_fewshot.prompt"), role="user" + ), + ] + chat_complete_res = self.simple_infer(question=query, examples=examples) + + # Extract generated code from LLM response + result = chat_complete_res["choices"][0]["message"]["content"] + + # Clean up markdown code formatting + if result.startswith('```python\n'): + result = result[10:] + if result.startswith('```python'): + result = result[9:] + if result.endswith('solver()\n```'): + result = result.replace('solver()\n```', '') + if result.endswith('```'): + result = result[:-3] + + # For zero-shot cases, add imports and answer extraction + if examples is None: + if result.startswith('def solver():'): + result = result.replace('def solver():', '') + result = 'import math\nimport numpy as np\nimport statistics\ndef solver():\n' + result + '\nans = solver()' + + logging.info('generated execution code: {}'.format(result)) + + # Execute code safely and process result + ans = self.safe_execute(result) + + if options is None: + prediction = self.floatify_ans(self.simplify_ans(ans, False)) + else: + prediction = self.floatify_ans(self.simplify_ans(ans, True)) + logging.info("Result of execution: {}".format(prediction)) + logging.info(chat_complete_res['usage']) + + completion_tokens = chat_complete_res['usage']['completion_tokens'] + prompt_tokens = chat_complete_res['usage']['prompt_tokens'] + + return {'last_output': prediction, 'completion_tokens': completion_tokens, 'prompt_tokens': prompt_tokens} \ No newline at end of file diff --git a/omagent-core/src/omagent_core/advanced_components/workflow/pot/agent/executor/user_prompt.prompt b/omagent-core/src/omagent_core/advanced_components/workflow/pot/agent/executor/user_prompt.prompt new file mode 100644 index 00000000..301754df --- /dev/null +++ b/omagent-core/src/omagent_core/advanced_components/workflow/pot/agent/executor/user_prompt.prompt @@ -0,0 +1,10 @@ + +import math +import numpy as np +import statistics + +# Question: {{question}} +# Answer this question by implementing a solver() function, use for loop if necessary. Only complete the function, do not return any other text. +def solver(): + # Let's write a Python program step by step, and then return the answer + # Firstly, we need define the following variable: diff --git a/omagent-core/src/omagent_core/advanced_components/workflow/pot/agent/executor/user_prompt_fewshot.prompt b/omagent-core/src/omagent_core/advanced_components/workflow/pot/agent/executor/user_prompt_fewshot.prompt new file mode 100644 index 00000000..d50f4851 --- /dev/null +++ b/omagent-core/src/omagent_core/advanced_components/workflow/pot/agent/executor/user_prompt_fewshot.prompt @@ -0,0 +1,4 @@ +Read the following questions and answer them with Python code, store the result as a 'ans' variable. +{{examples}} +Question: {{question}} +# Python code, return ans diff --git a/omagent-core/src/omagent_core/advanced_components/workflow/pot/workflow.py b/omagent-core/src/omagent_core/advanced_components/workflow/pot/workflow.py new file mode 100644 index 00000000..dfd67385 --- /dev/null +++ b/omagent-core/src/omagent_core/advanced_components/workflow/pot/workflow.py @@ -0,0 +1,57 @@ +from omagent_core.engine.workflow.conductor_workflow import ConductorWorkflow +from omagent_core.engine.workflow.task.simple_task import simple_task +from omagent_core.advanced_components.workflow.pot.agent.executor.PoTExecutor import PoTExecutor +from omagent_core.advanced_components.workflow.pot.agent.choice_extractor.choice_extractor import ChoiceExtractor +from typing import List + +class PoTWorkflow(ConductorWorkflow): + """Program-of-Thought workflow that executes math word problem solving tasks. + + This workflow configures and executes the PoT executor to solve math problems + by generating and running Python code. + """ + def __init__(self): + super().__init__(name='pot_workflow') + + def set_input(self, query: str, examples: str = None, options: str = None): + """Set input parameters and configure workflow. + + Args: + query: Math word problem text to solve + examples: Optional few-shot examples to guide code generation + options: Optional list of options to choose from + """ + self.query = query + self.examples = examples + self.options = options + self._configure_tasks() + self._configure_workflow() + + def _configure_tasks(self): + """Configure the PoT executor task with input parameters.""" + self.pot_executor_task = simple_task( + task_def_name=PoTExecutor, + task_reference_name='PoT_executor', + inputs={'query': self.query, 'examples': self.examples, 'options': self.options} + ) + self.choice_extractor_task = simple_task( + task_def_name=ChoiceExtractor, + task_reference_name='choice_extractor', + inputs={'query': self.query, + 'examples': self.examples, + 'options': self.options, + 'prediction': self.pot_executor_task.output('last_output'), + 'completion_tokens': self.pot_executor_task.output('completion_tokens'), + 'prompt_tokens': self.pot_executor_task.output('prompt_tokens')} + ) + + def _configure_workflow(self): + """Configure workflow execution flow and output. + + Sets up task dependencies and captures the final numerical answer + from the executor's output. + """ + self >> self.pot_executor_task >> self.choice_extractor_task + self.last_output = self.choice_extractor_task.output('last_output') + self.completion_tokens = self.choice_extractor_task.output('completion_tokens') + self.prompt_tokens = self.choice_extractor_task.output('prompt_tokens')