From ea99acf6e4bf831a655feac0074e89ba5b72d7db Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C3=81lvaro?= Date: Wed, 5 Feb 2025 18:35:20 +0000 Subject: [PATCH 1/9] Base docker image and server. Still WIP --- apps/aifindr-evaluations-runner/.gitignore | 3 ++ apps/aifindr-evaluations-runner/Dockerfile | 10 ++++ apps/aifindr-evaluations-runner/evaluator.py | 49 +++++++++++++++++++ apps/aifindr-evaluations-runner/main.py | 30 ++++++++++++ .../requirements.txt | 4 ++ deployment/docker-compose/docker-compose.yaml | 7 +++ 6 files changed, 103 insertions(+) create mode 100644 apps/aifindr-evaluations-runner/.gitignore create mode 100644 apps/aifindr-evaluations-runner/Dockerfile create mode 100644 apps/aifindr-evaluations-runner/evaluator.py create mode 100644 apps/aifindr-evaluations-runner/main.py create mode 100644 apps/aifindr-evaluations-runner/requirements.txt diff --git a/apps/aifindr-evaluations-runner/.gitignore b/apps/aifindr-evaluations-runner/.gitignore new file mode 100644 index 0000000000..71f91a18ae --- /dev/null +++ b/apps/aifindr-evaluations-runner/.gitignore @@ -0,0 +1,3 @@ +venv/ +__pycache__/ +*.pyc \ No newline at end of file diff --git a/apps/aifindr-evaluations-runner/Dockerfile b/apps/aifindr-evaluations-runner/Dockerfile new file mode 100644 index 0000000000..b0d54404e6 --- /dev/null +++ b/apps/aifindr-evaluations-runner/Dockerfile @@ -0,0 +1,10 @@ +FROM python:3.11-slim + +WORKDIR /app + +COPY requirements.txt . +RUN pip install --no-cache-dir -r requirements.txt + +COPY . . + +CMD ["uvicorn", "main:app", "--host", "0.0.0.0", "--port", "8001", "--log-level", "debug"] \ No newline at end of file diff --git a/apps/aifindr-evaluations-runner/evaluator.py b/apps/aifindr-evaluations-runner/evaluator.py new file mode 100644 index 0000000000..b78959c4ab --- /dev/null +++ b/apps/aifindr-evaluations-runner/evaluator.py @@ -0,0 +1,49 @@ +import os + +from opik import Opik +from opik.evaluation import evaluate +from opik.evaluation.metrics import (Hallucination, Moderation, AnswerRelevance, ContextRecall, ContextPrecision) + +os.environ["OPIK_URL_OVERRIDE"] = "http://host.docker.internal:5173/api" +os.environ["OPENAI_API_KEY"] = "" + +client = Opik() +dataset = client.get_dataset(name="BCP eval dataset") +prompt = client.get_prompt(name="BCP eval prompt") + +def evaluation_task(dataset_item): + print("Experiment item: ", dataset_item) + # your LLM application is called here + + + + result = { + "input": dataset_item['query'], + "output": "Es madrid", + "context": [] + } + return result + + +def execute_evaluation(dataset_name: str, experiment_name: str, project_name: str, base_prompt_name: str): + client = Opik() + dataset = client.get_dataset(name=dataset_name) + base_prompt = client.get_prompt(name=base_prompt_name) + metrics = [Hallucination(), Moderation(), AnswerRelevance(), ContextRecall(), ContextPrecision()] + + print("Base prompt: ", base_prompt) + + eval_results = evaluate( + experiment_name=experiment_name, + dataset=dataset, + task=evaluation_task, + scoring_metrics=metrics, + project_name=project_name, + experiment_config={ + "pepe": "paco" + }, + scoring_key_mapping={"expected_output": "criteria"}, + prompt=base_prompt + ) + + return eval_results diff --git a/apps/aifindr-evaluations-runner/main.py b/apps/aifindr-evaluations-runner/main.py new file mode 100644 index 0000000000..6c33f8a3cc --- /dev/null +++ b/apps/aifindr-evaluations-runner/main.py @@ -0,0 +1,30 @@ +from fastapi import FastAPI, HTTPException +from pydantic import BaseModel +from typing import Any, Dict +import logging +from evaluator import execute_evaluation + +# Configurar logging +logging.basicConfig(level=logging.DEBUG) +logger = logging.getLogger(__name__) + +app = FastAPI() + +class EvaluationRequest(BaseModel): + dataset_name: str + experiment_name: str + project_name: str + base_prompt_name: str +class EvaluationResponse(BaseModel): + result: Dict[str, Any] + +@app.post("/evaluations/run", response_model=EvaluationResponse) +async def run_evaluation(request: EvaluationRequest): + try: + logger.debug(f"Received request: {request}") + + result = execute_evaluation(request.dataset_name, request.experiment_name, request.project_name, request.base_prompt_name) + return EvaluationResponse(result=result) + except Exception as e: + logger.error(f"Error processing request: {str(e)}") + raise HTTPException(status_code=500, detail=str(e)) \ No newline at end of file diff --git a/apps/aifindr-evaluations-runner/requirements.txt b/apps/aifindr-evaluations-runner/requirements.txt new file mode 100644 index 0000000000..01d3c95a7b --- /dev/null +++ b/apps/aifindr-evaluations-runner/requirements.txt @@ -0,0 +1,4 @@ +fastapi==0.115.8 +uvicorn==0.34.0 +pydantic==2.10.6 +opik==1.4.10 \ No newline at end of file diff --git a/deployment/docker-compose/docker-compose.yaml b/deployment/docker-compose/docker-compose.yaml index 5a73be8aee..97ae8ffb7e 100644 --- a/deployment/docker-compose/docker-compose.yaml +++ b/deployment/docker-compose/docker-compose.yaml @@ -116,6 +116,13 @@ services: backend: condition: service_started + aifindr-evaluations-runner: + build: + context: ../../apps/aifindr-evaluations-runner + dockerfile: Dockerfile + ports: + - "8001:8001" + networks: default: From 60d96cae3ec9608cdbc306c264687ebfa6b5a908 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C3=81lvaro?= Date: Thu, 6 Feb 2025 16:57:44 +0000 Subject: [PATCH 2/9] Get env vars from .env and use a settings class --- apps/aifindr-evaluations-runner/.env.example | 2 ++ apps/aifindr-evaluations-runner/.gitignore | 3 ++- apps/aifindr-evaluations-runner/config.py | 11 +++++++++++ apps/aifindr-evaluations-runner/evaluator.py | 8 +++++--- apps/aifindr-evaluations-runner/requirements.txt | 4 +++- .../docker-compose/docker-compose.override.yaml | 5 +++++ deployment/docker-compose/docker-compose.yaml | 3 +++ 7 files changed, 31 insertions(+), 5 deletions(-) create mode 100644 apps/aifindr-evaluations-runner/.env.example create mode 100644 apps/aifindr-evaluations-runner/config.py diff --git a/apps/aifindr-evaluations-runner/.env.example b/apps/aifindr-evaluations-runner/.env.example new file mode 100644 index 0000000000..16e64c844c --- /dev/null +++ b/apps/aifindr-evaluations-runner/.env.example @@ -0,0 +1,2 @@ +OPIK_URL=http://host.docker.internal:5173/api +OPENAI_API_KEY=your-api-key-here \ No newline at end of file diff --git a/apps/aifindr-evaluations-runner/.gitignore b/apps/aifindr-evaluations-runner/.gitignore index 71f91a18ae..ae3120ea09 100644 --- a/apps/aifindr-evaluations-runner/.gitignore +++ b/apps/aifindr-evaluations-runner/.gitignore @@ -1,3 +1,4 @@ venv/ __pycache__/ -*.pyc \ No newline at end of file +*.pyc +.env \ No newline at end of file diff --git a/apps/aifindr-evaluations-runner/config.py b/apps/aifindr-evaluations-runner/config.py new file mode 100644 index 0000000000..5058063356 --- /dev/null +++ b/apps/aifindr-evaluations-runner/config.py @@ -0,0 +1,11 @@ +import os +from pydantic_settings import BaseSettings + +class Settings(BaseSettings): + OPIK_URL: str = os.getenv("OPIK_URL", "http://host.docker.internal:5173/api") + OPENAI_API_KEY: str = os.getenv("OPENAI_API_KEY", "") + + class Config: + env_file = ".env" + +settings = Settings() \ No newline at end of file diff --git a/apps/aifindr-evaluations-runner/evaluator.py b/apps/aifindr-evaluations-runner/evaluator.py index b78959c4ab..fb0c5357c5 100644 --- a/apps/aifindr-evaluations-runner/evaluator.py +++ b/apps/aifindr-evaluations-runner/evaluator.py @@ -3,9 +3,11 @@ from opik import Opik from opik.evaluation import evaluate from opik.evaluation.metrics import (Hallucination, Moderation, AnswerRelevance, ContextRecall, ContextPrecision) - -os.environ["OPIK_URL_OVERRIDE"] = "http://host.docker.internal:5173/api" -os.environ["OPENAI_API_KEY"] = "" +from config import settings + + +os.environ["OPIK_URL_OVERRIDE"] = settings.OPIK_URL +os.environ["OPENAI_API_KEY"] = settings.OPENAI_API_KEY client = Opik() dataset = client.get_dataset(name="BCP eval dataset") diff --git a/apps/aifindr-evaluations-runner/requirements.txt b/apps/aifindr-evaluations-runner/requirements.txt index 01d3c95a7b..30e6074fc8 100644 --- a/apps/aifindr-evaluations-runner/requirements.txt +++ b/apps/aifindr-evaluations-runner/requirements.txt @@ -1,4 +1,6 @@ fastapi==0.115.8 uvicorn==0.34.0 pydantic==2.10.6 -opik==1.4.10 \ No newline at end of file +opik==1.4.10 +pydantic-settings==2.7.1 +python-dotenv==1.0.1 \ No newline at end of file diff --git a/deployment/docker-compose/docker-compose.override.yaml b/deployment/docker-compose/docker-compose.override.yaml index 90cbe611c4..4bd3ba278d 100644 --- a/deployment/docker-compose/docker-compose.override.yaml +++ b/deployment/docker-compose/docker-compose.override.yaml @@ -1,3 +1,4 @@ +version: '3.8' services: mysql: ports: @@ -24,3 +25,7 @@ services: frontend: ports: - "5173:5173" # Exposing frontend server port to host + + aifindr-evaluations-runner: + env_file: + - ../../apps/aifindr-evaluations-runner/.env diff --git a/deployment/docker-compose/docker-compose.yaml b/deployment/docker-compose/docker-compose.yaml index 97ae8ffb7e..b176468e5e 100644 --- a/deployment/docker-compose/docker-compose.yaml +++ b/deployment/docker-compose/docker-compose.yaml @@ -1,4 +1,5 @@ name: opik +version: '3.8' services: mysql: @@ -120,6 +121,8 @@ services: build: context: ../../apps/aifindr-evaluations-runner dockerfile: Dockerfile + environment: + - OPIK_URL=http://host.docker.internal:5173/api ports: - "8001:8001" From 7f5ab0a3791fac0324c5ed1da442463379f97926 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C3=81lvaro?= Date: Fri, 7 Feb 2025 12:38:32 +0000 Subject: [PATCH 3/9] Fix env vars --- apps/aifindr-evaluations-runner/config.py | 11 ---------- apps/aifindr-evaluations-runner/evaluator.py | 6 +----- apps/aifindr-evaluations-runner/settings.py | 21 +++++++++++++++++++ deployment/docker-compose/docker-compose.yaml | 2 -- 4 files changed, 22 insertions(+), 18 deletions(-) delete mode 100644 apps/aifindr-evaluations-runner/config.py create mode 100644 apps/aifindr-evaluations-runner/settings.py diff --git a/apps/aifindr-evaluations-runner/config.py b/apps/aifindr-evaluations-runner/config.py deleted file mode 100644 index 5058063356..0000000000 --- a/apps/aifindr-evaluations-runner/config.py +++ /dev/null @@ -1,11 +0,0 @@ -import os -from pydantic_settings import BaseSettings - -class Settings(BaseSettings): - OPIK_URL: str = os.getenv("OPIK_URL", "http://host.docker.internal:5173/api") - OPENAI_API_KEY: str = os.getenv("OPENAI_API_KEY", "") - - class Config: - env_file = ".env" - -settings = Settings() \ No newline at end of file diff --git a/apps/aifindr-evaluations-runner/evaluator.py b/apps/aifindr-evaluations-runner/evaluator.py index fb0c5357c5..71fbbce561 100644 --- a/apps/aifindr-evaluations-runner/evaluator.py +++ b/apps/aifindr-evaluations-runner/evaluator.py @@ -3,11 +3,7 @@ from opik import Opik from opik.evaluation import evaluate from opik.evaluation.metrics import (Hallucination, Moderation, AnswerRelevance, ContextRecall, ContextPrecision) -from config import settings - - -os.environ["OPIK_URL_OVERRIDE"] = settings.OPIK_URL -os.environ["OPENAI_API_KEY"] = settings.OPENAI_API_KEY +from settings import settings client = Opik() dataset = client.get_dataset(name="BCP eval dataset") diff --git a/apps/aifindr-evaluations-runner/settings.py b/apps/aifindr-evaluations-runner/settings.py new file mode 100644 index 0000000000..b424452d81 --- /dev/null +++ b/apps/aifindr-evaluations-runner/settings.py @@ -0,0 +1,21 @@ +import os +from pydantic_settings import BaseSettings +from pydantic import field_validator + +class Settings(BaseSettings): + OPIK_URL_OVERRIDE: str = "http://host.docker.internal:5173/api" + OPENAI_API_KEY: str = "" + + @field_validator("*") + def no_empty_strings(cls, v): + if isinstance(v, str) and not v: + raise ValueError(f"Field {v} cannot be empty") + return v + + class Config: + env_file = ".env" + extra = "ignore" # Permite ignorar variables extra + +settings = Settings() + +print("Settings: ", settings) \ No newline at end of file diff --git a/deployment/docker-compose/docker-compose.yaml b/deployment/docker-compose/docker-compose.yaml index b176468e5e..7ab85b16e1 100644 --- a/deployment/docker-compose/docker-compose.yaml +++ b/deployment/docker-compose/docker-compose.yaml @@ -121,8 +121,6 @@ services: build: context: ../../apps/aifindr-evaluations-runner dockerfile: Dockerfile - environment: - - OPIK_URL=http://host.docker.internal:5173/api ports: - "8001:8001" From 53544fd0451826d49aa45da74de0473173447326 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C3=81lvaro?= Date: Mon, 10 Feb 2025 19:05:56 +0000 Subject: [PATCH 4/9] Use streaming response to get the retrieval responses to get the context for evaluation --- apps/aifindr-evaluations-runner/Dockerfile | 2 +- apps/aifindr-evaluations-runner/evaluator.py | 55 ++++++---- apps/aifindr-evaluations-runner/main.py | 20 ++-- .../requirements.txt | 4 +- apps/aifindr-evaluations-runner/settings.py | 5 +- apps/aifindr-evaluations-runner/workflows.py | 101 ++++++++++++++++++ 6 files changed, 149 insertions(+), 38 deletions(-) create mode 100644 apps/aifindr-evaluations-runner/workflows.py diff --git a/apps/aifindr-evaluations-runner/Dockerfile b/apps/aifindr-evaluations-runner/Dockerfile index b0d54404e6..a8b054b364 100644 --- a/apps/aifindr-evaluations-runner/Dockerfile +++ b/apps/aifindr-evaluations-runner/Dockerfile @@ -7,4 +7,4 @@ RUN pip install --no-cache-dir -r requirements.txt COPY . . -CMD ["uvicorn", "main:app", "--host", "0.0.0.0", "--port", "8001", "--log-level", "debug"] \ No newline at end of file +CMD ["uvicorn", "main:app", "--host", "0.0.0.0", "--port", "8001"] \ No newline at end of file diff --git a/apps/aifindr-evaluations-runner/evaluator.py b/apps/aifindr-evaluations-runner/evaluator.py index 71fbbce561..885c35d62d 100644 --- a/apps/aifindr-evaluations-runner/evaluator.py +++ b/apps/aifindr-evaluations-runner/evaluator.py @@ -1,47 +1,58 @@ -import os - +import json from opik import Opik from opik.evaluation import evaluate -from opik.evaluation.metrics import (Hallucination, Moderation, AnswerRelevance, ContextRecall, ContextPrecision) -from settings import settings +from opik.evaluation.metrics import (IsJson, Hallucination, AnswerRelevance, ContextRecall, ContextPrecision) +from pydantic import BaseModel +from workflows import run_workflow client = Opik() -dataset = client.get_dataset(name="BCP eval dataset") -prompt = client.get_prompt(name="BCP eval prompt") -def evaluation_task(dataset_item): - print("Experiment item: ", dataset_item) - # your LLM application is called here +class EvaluationParams(BaseModel): + dataset_name: str + experiment_name: str + project_name: str + base_prompt_name: str + workflow: str + +def evaluation_task(dataset_item, workflow: str): + response_content = run_workflow(workflow, dataset_item['query']) + parsed_response = json.loads(response_content.response) + print(json.dumps(parsed_response)) result = { "input": dataset_item['query'], - "output": "Es madrid", - "context": [] + "output": response_content.response, + "context": response_content.context, } return result +def build_evaluation_task(params: EvaluationParams): + return lambda dataset_item: evaluation_task(dataset_item, params.workflow) -def execute_evaluation(dataset_name: str, experiment_name: str, project_name: str, base_prompt_name: str): - client = Opik() - dataset = client.get_dataset(name=dataset_name) - base_prompt = client.get_prompt(name=base_prompt_name) - metrics = [Hallucination(), Moderation(), AnswerRelevance(), ContextRecall(), ContextPrecision()] - print("Base prompt: ", base_prompt) +def execute_evaluation(params: EvaluationParams): + client = Opik() + dataset = client.get_dataset(name=params.dataset_name) + base_prompt = client.get_prompt(name=params.base_prompt_name) + metrics = [IsJson(), AnswerRelevance(), Hallucination(), ContextRecall()] + print("Base prompt: ", base_prompt.prompt) + # TODO: build the metric with the base prompt eval_results = evaluate( - experiment_name=experiment_name, + experiment_name=params.experiment_name, dataset=dataset, - task=evaluation_task, + task=build_evaluation_task(params), scoring_metrics=metrics, - project_name=project_name, + project_name=params.project_name, experiment_config={ - "pepe": "paco" + }, scoring_key_mapping={"expected_output": "criteria"}, - prompt=base_prompt + prompt=base_prompt, + task_threads=10, + nb_samples=1 ) return eval_results diff --git a/apps/aifindr-evaluations-runner/main.py b/apps/aifindr-evaluations-runner/main.py index 6c33f8a3cc..57df46383a 100644 --- a/apps/aifindr-evaluations-runner/main.py +++ b/apps/aifindr-evaluations-runner/main.py @@ -1,30 +1,26 @@ from fastapi import FastAPI, HTTPException from pydantic import BaseModel -from typing import Any, Dict +from typing import Any import logging -from evaluator import execute_evaluation +from evaluator import EvaluationParams, execute_evaluation # Configurar logging -logging.basicConfig(level=logging.DEBUG) +logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) app = FastAPI() -class EvaluationRequest(BaseModel): - dataset_name: str - experiment_name: str - project_name: str - base_prompt_name: str class EvaluationResponse(BaseModel): - result: Dict[str, Any] + result: Any @app.post("/evaluations/run", response_model=EvaluationResponse) -async def run_evaluation(request: EvaluationRequest): +async def run_evaluation(request: EvaluationParams): try: logger.debug(f"Received request: {request}") - result = execute_evaluation(request.dataset_name, request.experiment_name, request.project_name, request.base_prompt_name) + result = execute_evaluation(request) return EvaluationResponse(result=result) except Exception as e: logger.error(f"Error processing request: {str(e)}") - raise HTTPException(status_code=500, detail=str(e)) \ No newline at end of file + raise HTTPException(status_code=500, detail=str(e)) + diff --git a/apps/aifindr-evaluations-runner/requirements.txt b/apps/aifindr-evaluations-runner/requirements.txt index 30e6074fc8..65e0b7189d 100644 --- a/apps/aifindr-evaluations-runner/requirements.txt +++ b/apps/aifindr-evaluations-runner/requirements.txt @@ -3,4 +3,6 @@ uvicorn==0.34.0 pydantic==2.10.6 opik==1.4.10 pydantic-settings==2.7.1 -python-dotenv==1.0.1 \ No newline at end of file +python-dotenv==1.0.1 +types-requests>=2.31.0.20240311 +sseclient-py==1.7.2 diff --git a/apps/aifindr-evaluations-runner/settings.py b/apps/aifindr-evaluations-runner/settings.py index b424452d81..d56f9177bc 100644 --- a/apps/aifindr-evaluations-runner/settings.py +++ b/apps/aifindr-evaluations-runner/settings.py @@ -1,15 +1,16 @@ -import os from pydantic_settings import BaseSettings from pydantic import field_validator class Settings(BaseSettings): OPIK_URL_OVERRIDE: str = "http://host.docker.internal:5173/api" OPENAI_API_KEY: str = "" + ELLMENTAL_API_URL: str = "" + ELLMENTAL_API_KEY: str = "" @field_validator("*") def no_empty_strings(cls, v): if isinstance(v, str) and not v: - raise ValueError(f"Field {v} cannot be empty") + raise ValueError("Field cannot be empty") return v class Config: diff --git a/apps/aifindr-evaluations-runner/workflows.py b/apps/aifindr-evaluations-runner/workflows.py new file mode 100644 index 0000000000..a47e46d1ba --- /dev/null +++ b/apps/aifindr-evaluations-runner/workflows.py @@ -0,0 +1,101 @@ +import time +import json +import requests +from settings import settings +from sseclient import SSEClient +from pydantic import BaseModel +from typing import Optional, List, Any + +MAX_RETRIES = 3 +RETRIEVAL_EVENT_ID_PREFIX = "similarity_search_by_text" +LLM_EVENT_ID_PREFIX = "llm" + + +class WorkflowResponse(BaseModel): + """ + Response model for workflow execution results. + + Attributes: + context: Retrieved context from similarity search + response: Combined LLM response + """ + context: Optional[List[Any]] = None + response: str = "" + + +def run_workflow(workflow: str, query: str) -> WorkflowResponse: + """ + Executes a workflow with the given query and handles retries. + + Args: + workflow: The workflow identifier/path + query: The query to process + + Returns: + WorkflowResponse: The processed response containing retrieval and LLM responses + """ + retry_count = 0 + while retry_count < MAX_RETRIES: + try: + return _make_workflow_request(workflow, query) + except Exception as e: + wait_time = 0.5 * (retry_count + 1) # Increasing delay between retries + print(f"Request failed with error: {e}. Waiting {wait_time}s before retrying... ({retry_count + 1}/{MAX_RETRIES})") + retry_count += 1 + time.sleep(wait_time) + + raise Exception(f"Failed to complete request after {MAX_RETRIES} retries") + + +def _make_workflow_request(workflow: str, query: str) -> WorkflowResponse: + """ + Makes a POST request to the Ellmental API and processes SSE responses. + + Args: + workflow: The workflow identifier/path + query: The query to process + + Returns: + WorkflowResponse: A model containing the retrieval response and concatenated LLM responses + + Raises: + requests.exceptions.RequestException: If the request fails + """ + response = requests.post( + f"{settings.ELLMENTAL_API_URL}{workflow}", + stream=True, + headers={ + "Authorization": f"Bearer {settings.ELLMENTAL_API_KEY}", + "Content-Type": "application/json", + "Accept": "text/event-stream" + }, + json={ + "query": query, + "stream": "true" + }, + ) + + if not response.ok: + raise requests.exceptions.RequestException(f"Error calling Ellmental API: {response.text}") + + client = SSEClient(response) + retrieval_response = None + llm_response = '' + + for event in client.events(): + if not event.data: + continue + + try: + data = json.loads(event.data) + if event.id.startswith(RETRIEVAL_EVENT_ID_PREFIX): + retrieval_response = data['response']['hits'] + elif event.id.startswith(LLM_EVENT_ID_PREFIX) and 'content' in data['delta']['message']: + llm_response += data['delta']['message']['content'] + except json.JSONDecodeError as e: + print(f"Failed to parse event data: {event.data}. Error: {e}") + + return WorkflowResponse( + context=retrieval_response, + response=llm_response + ) \ No newline at end of file From c9bc5da77246f5eda2369f7eb734beff129611a5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C3=81lvaro?= Date: Tue, 11 Feb 2025 17:23:41 +0000 Subject: [PATCH 5/9] Add new metric "Follows criteria" --- apps/aifindr-evaluations-runner/evaluator.py | 19 +++-- .../metrics/follows_criteria.py | 70 +++++++++++++++++++ apps/aifindr-evaluations-runner/settings.py | 4 +- apps/aifindr-evaluations-runner/workflows.py | 10 +-- 4 files changed, 86 insertions(+), 17 deletions(-) create mode 100644 apps/aifindr-evaluations-runner/metrics/follows_criteria.py diff --git a/apps/aifindr-evaluations-runner/evaluator.py b/apps/aifindr-evaluations-runner/evaluator.py index 885c35d62d..a593bf7b2a 100644 --- a/apps/aifindr-evaluations-runner/evaluator.py +++ b/apps/aifindr-evaluations-runner/evaluator.py @@ -4,6 +4,7 @@ from opik.evaluation.metrics import (IsJson, Hallucination, AnswerRelevance, ContextRecall, ContextPrecision) from pydantic import BaseModel from workflows import run_workflow +from metrics.follows_criteria import FollowsCriteria client = Opik() @@ -18,8 +19,10 @@ class EvaluationParams(BaseModel): def evaluation_task(dataset_item, workflow: str): response_content = run_workflow(workflow, dataset_item['query']) - parsed_response = json.loads(response_content.response) - print(json.dumps(parsed_response)) + # parsed_response = json.loads(response_content.response) + # print(parsed_response) + # print(parsed_response.keys()) + # print(parsed_response['text_response']) result = { "input": dataset_item['query'], @@ -36,9 +39,11 @@ def execute_evaluation(params: EvaluationParams): client = Opik() dataset = client.get_dataset(name=params.dataset_name) base_prompt = client.get_prompt(name=params.base_prompt_name) - metrics = [IsJson(), AnswerRelevance(), Hallucination(), ContextRecall()] print("Base prompt: ", base_prompt.prompt) - # TODO: build the metric with the base prompt + if not base_prompt: + raise ValueError(f"No base prompt found with name '{params.base_prompt_name}'") + # metrics = [IsJson(), AnswerRelevance(), Hallucination(), ContextRecall(), ContextPrecision(), FollowsCriteria(base_prompt.prompt)] + metrics = [Hallucination(), FollowsCriteria(base_prompt.prompt)] eval_results = evaluate( experiment_name=params.experiment_name, @@ -47,12 +52,12 @@ def execute_evaluation(params: EvaluationParams): scoring_metrics=metrics, project_name=params.project_name, experiment_config={ - + "base_prompt_version": base_prompt.commit, }, - scoring_key_mapping={"expected_output": "criteria"}, + scoring_key_mapping={"expected_output": "criteria"}, # Used by Context* related prompt=base_prompt, task_threads=10, - nb_samples=1 + nb_samples=2 ) return eval_results diff --git a/apps/aifindr-evaluations-runner/metrics/follows_criteria.py b/apps/aifindr-evaluations-runner/metrics/follows_criteria.py new file mode 100644 index 0000000000..9340cc66d9 --- /dev/null +++ b/apps/aifindr-evaluations-runner/metrics/follows_criteria.py @@ -0,0 +1,70 @@ +from opik.evaluation.metrics import base_metric, score_result +from opik.evaluation import models +from pydantic import BaseModel +import json +from typing import Any + +class FollowsCriteriaResult(BaseModel): + score: int + reason: str + +class FollowsCriteria(base_metric.BaseMetric): + """ + A metric that evaluates whether an LLM's output follows specified criteria based on a prompt template. + + This metric uses another LLM to judge if the output adheres to the criteria defined in the prompt template. + It returns a score between 0 and 1, where 1 indicates full compliance with the criteria. + + Args: + prompt_template: The template string containing the base prompt where the specific item criteria will be inserted. It must contain the varaible "{criteria}" and "{output}" somewhere + name: The name of the metric. Defaults to "Follows criteria" + model_name: The name of the LLM model to use for evaluation. Defaults to "gpt-4" + + Example: + >>> from metrics import FollowsCriteria + >>> prompt_template = "You should follow the criteria listed here: {criteria}. The response to evaluate is: {output}" + >>> # Assuming criteria is "The response should be a country" + >>> metric = FollowsCriteria(prompt_template=prompt_template) + >>> result = metric.score('Spain') + >>> print(result.value) + 1.0 + >>> print(result.reason) + The output perfectly follows the criteria by providing the name of the country Spain + """ + def __init__(self, prompt_template: str, name: str = "Follows criteria", model_name: str = "gpt-4o"): + self.name = name + self.llm_client = models.LiteLLMChatModel(model_name=model_name) + self.prompt_template = f""" +{prompt_template} +----- +Answer with a json with the following format: +{{{{ + "score": , + "reason": "" +}}}} + + """.lstrip().rstrip() + + def score(self, output: str, criteria: str, **ignored_kwargs: Any): + # Construct the prompt based on the output of the LLM + prompt = self.prompt_template.format( + output=output, + criteria=criteria + ) + + print("Prompt total: ", prompt) + # Generate and parse the response from the LLM + response = self.llm_client.generate_string(input=prompt, response_format=FollowsCriteriaResult) + + response_dict = json.loads(response) + return score_result.ScoreResult( + name=self.name, + value=response_dict["score"], + reason=response_dict["reason"] + ) + + def ascore(self, output: str, criteria: str, **ignored_kwargs: Any): + # TODO: Por ahora no funciona + print("SCORE ASYNC") + # await self._model.agenerate_string + return self.score(output, criteria) \ No newline at end of file diff --git a/apps/aifindr-evaluations-runner/settings.py b/apps/aifindr-evaluations-runner/settings.py index d56f9177bc..1d88ef8bef 100644 --- a/apps/aifindr-evaluations-runner/settings.py +++ b/apps/aifindr-evaluations-runner/settings.py @@ -17,6 +17,4 @@ class Config: env_file = ".env" extra = "ignore" # Permite ignorar variables extra -settings = Settings() - -print("Settings: ", settings) \ No newline at end of file +settings = Settings() \ No newline at end of file diff --git a/apps/aifindr-evaluations-runner/workflows.py b/apps/aifindr-evaluations-runner/workflows.py index a47e46d1ba..06382987a0 100644 --- a/apps/aifindr-evaluations-runner/workflows.py +++ b/apps/aifindr-evaluations-runner/workflows.py @@ -1,3 +1,4 @@ +import logging import time import json import requests @@ -10,15 +11,9 @@ RETRIEVAL_EVENT_ID_PREFIX = "similarity_search_by_text" LLM_EVENT_ID_PREFIX = "llm" +logger = logging.getLogger(__name__) class WorkflowResponse(BaseModel): - """ - Response model for workflow execution results. - - Attributes: - context: Retrieved context from similarity search - response: Combined LLM response - """ context: Optional[List[Any]] = None response: str = "" @@ -61,6 +56,7 @@ def _make_workflow_request(workflow: str, query: str) -> WorkflowResponse: Raises: requests.exceptions.RequestException: If the request fails """ + logger.info(f"Running workflow: {workflow} with query: {query}") response = requests.post( f"{settings.ELLMENTAL_API_URL}{workflow}", stream=True, From cb727045501b1db7aeef912cda4af3080cf7e0c5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C3=81lvaro?= Date: Tue, 11 Feb 2025 17:27:18 +0000 Subject: [PATCH 6/9] Fix evalaute params --- apps/aifindr-evaluations-runner/evaluator.py | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/apps/aifindr-evaluations-runner/evaluator.py b/apps/aifindr-evaluations-runner/evaluator.py index a593bf7b2a..db854a7f68 100644 --- a/apps/aifindr-evaluations-runner/evaluator.py +++ b/apps/aifindr-evaluations-runner/evaluator.py @@ -54,10 +54,9 @@ def execute_evaluation(params: EvaluationParams): experiment_config={ "base_prompt_version": base_prompt.commit, }, - scoring_key_mapping={"expected_output": "criteria"}, # Used by Context* related + scoring_key_mapping={"expected_output": "criteria"}, # Used by Context* related metrics prompt=base_prompt, - task_threads=10, - nb_samples=2 + task_threads=20, ) return eval_results From d4380aec14f4f956ff238bdf01795d9086084640 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C3=81lvaro?= Date: Tue, 11 Feb 2025 18:56:45 +0000 Subject: [PATCH 7/9] Improve prompt and add evaluations queue --- apps/aifindr-evaluations-runner/evaluator.py | 26 ++++--- apps/aifindr-evaluations-runner/main.py | 70 ++++++++++++++++--- .../metrics/follows_criteria.py | 15 ++-- 3 files changed, 83 insertions(+), 28 deletions(-) diff --git a/apps/aifindr-evaluations-runner/evaluator.py b/apps/aifindr-evaluations-runner/evaluator.py index db854a7f68..961ad6fdb9 100644 --- a/apps/aifindr-evaluations-runner/evaluator.py +++ b/apps/aifindr-evaluations-runner/evaluator.py @@ -1,21 +1,27 @@ -import json from opik import Opik from opik.evaluation import evaluate -from opik.evaluation.metrics import (IsJson, Hallucination, AnswerRelevance, ContextRecall, ContextPrecision) -from pydantic import BaseModel +from opik.evaluation.metrics import (Hallucination, ContextRecall, ContextPrecision) from workflows import run_workflow from metrics.follows_criteria import FollowsCriteria +from pydantic import BaseModel +from enum import Enum client = Opik() +class ExperimentStatus(Enum): + RUNNING = "running" + COMPLETED = "completed" # Not used yet + FAILED = "failed" # Not used yet + + class EvaluationParams(BaseModel): + task_id: str dataset_name: str experiment_name: str project_name: str base_prompt_name: str workflow: str - def evaluation_task(dataset_item, workflow: str): response_content = run_workflow(workflow, dataset_item['query']) @@ -36,16 +42,14 @@ def build_evaluation_task(params: EvaluationParams): def execute_evaluation(params: EvaluationParams): - client = Opik() dataset = client.get_dataset(name=params.dataset_name) base_prompt = client.get_prompt(name=params.base_prompt_name) - print("Base prompt: ", base_prompt.prompt) if not base_prompt: raise ValueError(f"No base prompt found with name '{params.base_prompt_name}'") - # metrics = [IsJson(), AnswerRelevance(), Hallucination(), ContextRecall(), ContextPrecision(), FollowsCriteria(base_prompt.prompt)] - metrics = [Hallucination(), FollowsCriteria(base_prompt.prompt)] - eval_results = evaluate( + metrics = [FollowsCriteria(base_prompt.prompt), Hallucination(), ContextRecall(), ContextPrecision()] + + evaluate( experiment_name=params.experiment_name, dataset=dataset, task=build_evaluation_task(params), @@ -53,10 +57,10 @@ def execute_evaluation(params: EvaluationParams): project_name=params.project_name, experiment_config={ "base_prompt_version": base_prompt.commit, + "task_id": params.task_id }, scoring_key_mapping={"expected_output": "criteria"}, # Used by Context* related metrics prompt=base_prompt, task_threads=20, + nb_samples=3 ) - - return eval_results diff --git a/apps/aifindr-evaluations-runner/main.py b/apps/aifindr-evaluations-runner/main.py index 57df46383a..b9fede28ff 100644 --- a/apps/aifindr-evaluations-runner/main.py +++ b/apps/aifindr-evaluations-runner/main.py @@ -1,25 +1,75 @@ from fastapi import FastAPI, HTTPException from pydantic import BaseModel -from typing import Any import logging -from evaluator import EvaluationParams, execute_evaluation +import asyncio +import uuid +from evaluator import EvaluationParams, ExperimentStatus, execute_evaluation -# Configurar logging +# Configure logging logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) app = FastAPI() -class EvaluationResponse(BaseModel): - result: Any +TASK_QUEUE: asyncio.Queue[EvaluationParams] = asyncio.Queue(maxsize=100) # Maximum number of evaluations in queue +MAX_CONCURRENT_TASKS = 5 # Number of concurrent evaluations -@app.post("/evaluations/run", response_model=EvaluationResponse) -async def run_evaluation(request: EvaluationParams): +class RunEvaluationsRequest(BaseModel): + dataset_name: str + experiment_name: str + project_name: str + base_prompt_name: str + workflow: str + +class RunEvaluationsResponse(BaseModel): + status: str + task_id: str + +async def process_queue(): + """Background task to process queued evaluations""" + while True: + # Get a task from the queue + evaluation_params = await TASK_QUEUE.get() + try: + logger.info(f"PROCESSING EVALUATION: {evaluation_params.task_id}") + execute_evaluation(evaluation_params) + except Exception as e: + logger.error(f"Error processing evaluation: {str(e)}") + finally: + # Mark the task as done + TASK_QUEUE.task_done() + +@app.on_event("startup") +async def startup_event(): + # Start background workers to process the queue + for _ in range(MAX_CONCURRENT_TASKS): + asyncio.create_task(process_queue()) + +@app.post("/evaluations/run", response_model=RunEvaluationsResponse) +async def run_evaluation(request: RunEvaluationsRequest): try: - logger.debug(f"Received request: {request}") + # Generate task ID + task_id = str(uuid.uuid4()) + + # Create EvaluationParams with all fields from request plus task_id + evaluation_params = EvaluationParams( + task_id=task_id, + dataset_name=request.dataset_name, + experiment_name=request.experiment_name, + project_name=request.project_name, + base_prompt_name=request.base_prompt_name, + workflow=request.workflow + ) + + logger.info(f"Try adding evaluation task to queue: {evaluation_params}") + try: + TASK_QUEUE.put_nowait(evaluation_params) + logger.info(f"Evaluation task added to queue: {evaluation_params}") + except asyncio.QueueFull: + logger.error("Queue is full. Cannot add more tasks.") + raise HTTPException(status_code=503, detail="Server is currently at maximum capacity. Please try again later.") - result = execute_evaluation(request) - return EvaluationResponse(result=result) + return RunEvaluationsResponse(status=ExperimentStatus.RUNNING.value, task_id=task_id) except Exception as e: logger.error(f"Error processing request: {str(e)}") raise HTTPException(status_code=500, detail=str(e)) diff --git a/apps/aifindr-evaluations-runner/metrics/follows_criteria.py b/apps/aifindr-evaluations-runner/metrics/follows_criteria.py index 9340cc66d9..88d7d73013 100644 --- a/apps/aifindr-evaluations-runner/metrics/follows_criteria.py +++ b/apps/aifindr-evaluations-runner/metrics/follows_criteria.py @@ -38,11 +38,18 @@ def __init__(self, prompt_template: str, name: str = "Follows criteria", model_n {prompt_template} ----- Answer with a json with the following format: + {{{{ - "score": , + "score": , "reason": "" }}}} +Follow this instructions to fill the score: +- **0.0**: The response does not follow the criteria at all. +- **0.1 - 0.3**: The response is somewhat related to the criteria, but it doesn't follow it. +- **0.4 - 0.6**: The response partially follows the criteria, following some points, but not others. Or those points that follow are only partially correct. +- **0.7 - 0.9**: The response either fulfills all criteria but it is lacking details or misses between 10-30% of the criteria points. +- **1.0**: The response perfectly follows the criteria completely. """.lstrip().rstrip() def score(self, output: str, criteria: str, **ignored_kwargs: Any): @@ -62,9 +69,3 @@ def score(self, output: str, criteria: str, **ignored_kwargs: Any): value=response_dict["score"], reason=response_dict["reason"] ) - - def ascore(self, output: str, criteria: str, **ignored_kwargs: Any): - # TODO: Por ahora no funciona - print("SCORE ASYNC") - # await self._model.agenerate_string - return self.score(output, criteria) \ No newline at end of file From 9ee001285222e896ea2994ff2051e09461e84b5c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C3=81lvaro?= Date: Tue, 11 Feb 2025 18:57:45 +0000 Subject: [PATCH 8/9] Remove nb_samples --- apps/aifindr-evaluations-runner/evaluator.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/apps/aifindr-evaluations-runner/evaluator.py b/apps/aifindr-evaluations-runner/evaluator.py index 961ad6fdb9..7c1be44d97 100644 --- a/apps/aifindr-evaluations-runner/evaluator.py +++ b/apps/aifindr-evaluations-runner/evaluator.py @@ -61,6 +61,5 @@ def execute_evaluation(params: EvaluationParams): }, scoring_key_mapping={"expected_output": "criteria"}, # Used by Context* related metrics prompt=base_prompt, - task_threads=20, - nb_samples=3 + task_threads=20 ) From d180a12a121fd43d162eeb7bd160eecbe81347c1 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C3=81lvaro?= Date: Wed, 12 Feb 2025 12:31:12 +0000 Subject: [PATCH 9/9] Evaluations can now run concurrently --- apps/aifindr-evaluations-runner/Dockerfile | 2 +- apps/aifindr-evaluations-runner/main.py | 14 ++++++-------- 2 files changed, 7 insertions(+), 9 deletions(-) diff --git a/apps/aifindr-evaluations-runner/Dockerfile b/apps/aifindr-evaluations-runner/Dockerfile index a8b054b364..c6332d79de 100644 --- a/apps/aifindr-evaluations-runner/Dockerfile +++ b/apps/aifindr-evaluations-runner/Dockerfile @@ -7,4 +7,4 @@ RUN pip install --no-cache-dir -r requirements.txt COPY . . -CMD ["uvicorn", "main:app", "--host", "0.0.0.0", "--port", "8001"] \ No newline at end of file +CMD ["uvicorn", "main:app", "--host", "0.0.0.0", "--port", "8001", "--workers", "4"] \ No newline at end of file diff --git a/apps/aifindr-evaluations-runner/main.py b/apps/aifindr-evaluations-runner/main.py index b9fede28ff..ffa065709a 100644 --- a/apps/aifindr-evaluations-runner/main.py +++ b/apps/aifindr-evaluations-runner/main.py @@ -11,7 +11,7 @@ app = FastAPI() -TASK_QUEUE: asyncio.Queue[EvaluationParams] = asyncio.Queue(maxsize=100) # Maximum number of evaluations in queue +TASK_QUEUE: asyncio.Queue[EvaluationParams] = asyncio.Queue(maxsize=10) # Maximum number of evaluations in queue MAX_CONCURRENT_TASKS = 5 # Number of concurrent evaluations class RunEvaluationsRequest(BaseModel): @@ -31,12 +31,12 @@ async def process_queue(): # Get a task from the queue evaluation_params = await TASK_QUEUE.get() try: - logger.info(f"PROCESSING EVALUATION: {evaluation_params.task_id}") - execute_evaluation(evaluation_params) + # Run execute_evaluation in a thread pool so that it doesn't block the event loop + loop = asyncio.get_running_loop() + await loop.run_in_executor(None, execute_evaluation, evaluation_params) except Exception as e: logger.error(f"Error processing evaluation: {str(e)}") finally: - # Mark the task as done TASK_QUEUE.task_done() @app.on_event("startup") @@ -50,7 +50,6 @@ async def run_evaluation(request: RunEvaluationsRequest): try: # Generate task ID task_id = str(uuid.uuid4()) - # Create EvaluationParams with all fields from request plus task_id evaluation_params = EvaluationParams( task_id=task_id, @@ -61,16 +60,15 @@ async def run_evaluation(request: RunEvaluationsRequest): workflow=request.workflow ) - logger.info(f"Try adding evaluation task to queue: {evaluation_params}") try: TASK_QUEUE.put_nowait(evaluation_params) logger.info(f"Evaluation task added to queue: {evaluation_params}") except asyncio.QueueFull: - logger.error("Queue is full. Cannot add more tasks.") + logger.error(f"Queue is full. Evaluation task not added to the queue: {evaluation_params}") raise HTTPException(status_code=503, detail="Server is currently at maximum capacity. Please try again later.") return RunEvaluationsResponse(status=ExperimentStatus.RUNNING.value, task_id=task_id) except Exception as e: logger.error(f"Error processing request: {str(e)}") raise HTTPException(status_code=500, detail=str(e)) - + \ No newline at end of file