Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Create evaluation runner #1

Open
wants to merge 9 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions apps/aifindr-evaluations-runner/.env.example
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
OPIK_URL=http://host.docker.internal:5173/api
OPENAI_API_KEY=your-api-key-here
4 changes: 4 additions & 0 deletions apps/aifindr-evaluations-runner/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
venv/
__pycache__/
*.pyc
.env
10 changes: 10 additions & 0 deletions apps/aifindr-evaluations-runner/Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
FROM python:3.11-slim

WORKDIR /app

COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt

COPY . .

CMD ["uvicorn", "main:app", "--host", "0.0.0.0", "--port", "8001", "--workers", "4"]
65 changes: 65 additions & 0 deletions apps/aifindr-evaluations-runner/evaluator.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
from opik import Opik
from opik.evaluation import evaluate
from opik.evaluation.metrics import (Hallucination, ContextRecall, ContextPrecision)
from workflows import run_workflow
from metrics.follows_criteria import FollowsCriteria
from pydantic import BaseModel
from enum import Enum

client = Opik()

class ExperimentStatus(Enum):
RUNNING = "running"
COMPLETED = "completed" # Not used yet
FAILED = "failed" # Not used yet


class EvaluationParams(BaseModel):
task_id: str
dataset_name: str
experiment_name: str
project_name: str
base_prompt_name: str
workflow: str

def evaluation_task(dataset_item, workflow: str):
response_content = run_workflow(workflow, dataset_item['query'])

# parsed_response = json.loads(response_content.response)
# print(parsed_response)
# print(parsed_response.keys())
# print(parsed_response['text_response'])

result = {
"input": dataset_item['query'],
"output": response_content.response,
"context": response_content.context,
}
return result

def build_evaluation_task(params: EvaluationParams):
return lambda dataset_item: evaluation_task(dataset_item, params.workflow)


def execute_evaluation(params: EvaluationParams):
dataset = client.get_dataset(name=params.dataset_name)
base_prompt = client.get_prompt(name=params.base_prompt_name)
if not base_prompt:
raise ValueError(f"No base prompt found with name '{params.base_prompt_name}'")

metrics = [FollowsCriteria(base_prompt.prompt), Hallucination(), ContextRecall(), ContextPrecision()]

evaluate(
experiment_name=params.experiment_name,
dataset=dataset,
task=build_evaluation_task(params),
scoring_metrics=metrics,
project_name=params.project_name,
experiment_config={
"base_prompt_version": base_prompt.commit,
"task_id": params.task_id
},
scoring_key_mapping={"expected_output": "criteria"}, # Used by Context* related metrics
prompt=base_prompt,
task_threads=20
)
74 changes: 74 additions & 0 deletions apps/aifindr-evaluations-runner/main.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
import logging
import asyncio
import uuid
from evaluator import EvaluationParams, ExperimentStatus, execute_evaluation

# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

app = FastAPI()

TASK_QUEUE: asyncio.Queue[EvaluationParams] = asyncio.Queue(maxsize=10) # Maximum number of evaluations in queue
MAX_CONCURRENT_TASKS = 5 # Number of concurrent evaluations

class RunEvaluationsRequest(BaseModel):
dataset_name: str
experiment_name: str
project_name: str
base_prompt_name: str
workflow: str

class RunEvaluationsResponse(BaseModel):
status: str
task_id: str

async def process_queue():
"""Background task to process queued evaluations"""
while True:
# Get a task from the queue
evaluation_params = await TASK_QUEUE.get()
try:
# Run execute_evaluation in a thread pool so that it doesn't block the event loop
loop = asyncio.get_running_loop()
await loop.run_in_executor(None, execute_evaluation, evaluation_params)
except Exception as e:
logger.error(f"Error processing evaluation: {str(e)}")
finally:
TASK_QUEUE.task_done()

@app.on_event("startup")
async def startup_event():
# Start background workers to process the queue
for _ in range(MAX_CONCURRENT_TASKS):
asyncio.create_task(process_queue())

@app.post("/evaluations/run", response_model=RunEvaluationsResponse)
async def run_evaluation(request: RunEvaluationsRequest):
try:
# Generate task ID
task_id = str(uuid.uuid4())
# Create EvaluationParams with all fields from request plus task_id
evaluation_params = EvaluationParams(
task_id=task_id,
dataset_name=request.dataset_name,
experiment_name=request.experiment_name,
project_name=request.project_name,
base_prompt_name=request.base_prompt_name,
workflow=request.workflow
)

try:
TASK_QUEUE.put_nowait(evaluation_params)
logger.info(f"Evaluation task added to queue: {evaluation_params}")
except asyncio.QueueFull:
logger.error(f"Queue is full. Evaluation task not added to the queue: {evaluation_params}")
raise HTTPException(status_code=503, detail="Server is currently at maximum capacity. Please try again later.")

return RunEvaluationsResponse(status=ExperimentStatus.RUNNING.value, task_id=task_id)
except Exception as e:
logger.error(f"Error processing request: {str(e)}")
raise HTTPException(status_code=500, detail=str(e))

71 changes: 71 additions & 0 deletions apps/aifindr-evaluations-runner/metrics/follows_criteria.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
from opik.evaluation.metrics import base_metric, score_result
from opik.evaluation import models
from pydantic import BaseModel
import json
from typing import Any

class FollowsCriteriaResult(BaseModel):
score: int
reason: str

class FollowsCriteria(base_metric.BaseMetric):
"""
A metric that evaluates whether an LLM's output follows specified criteria based on a prompt template.

This metric uses another LLM to judge if the output adheres to the criteria defined in the prompt template.
It returns a score between 0 and 1, where 1 indicates full compliance with the criteria.

Args:
prompt_template: The template string containing the base prompt where the specific item criteria will be inserted. It must contain the varaible "{criteria}" and "{output}" somewhere
name: The name of the metric. Defaults to "Follows criteria"
model_name: The name of the LLM model to use for evaluation. Defaults to "gpt-4"

Example:
>>> from metrics import FollowsCriteria
>>> prompt_template = "You should follow the criteria listed here: {criteria}. The response to evaluate is: {output}"
>>> # Assuming criteria is "The response should be a country"
>>> metric = FollowsCriteria(prompt_template=prompt_template)
>>> result = metric.score('Spain')
>>> print(result.value)
1.0
>>> print(result.reason)
The output perfectly follows the criteria by providing the name of the country Spain
"""
def __init__(self, prompt_template: str, name: str = "Follows criteria", model_name: str = "gpt-4o"):
self.name = name
self.llm_client = models.LiteLLMChatModel(model_name=model_name)
self.prompt_template = f"""
{prompt_template}
-----
Answer with a json with the following format:

{{{{
"score": <score float number between 0.0 and 1.0>,
"reason": "<reason for the score>"
}}}}

Follow this instructions to fill the score:
- **0.0**: The response does not follow the criteria at all.
- **0.1 - 0.3**: The response is somewhat related to the criteria, but it doesn't follow it.
- **0.4 - 0.6**: The response partially follows the criteria, following some points, but not others. Or those points that follow are only partially correct.
- **0.7 - 0.9**: The response either fulfills all criteria but it is lacking details or misses between 10-30% of the criteria points.
- **1.0**: The response perfectly follows the criteria completely.
""".lstrip().rstrip()

def score(self, output: str, criteria: str, **ignored_kwargs: Any):
# Construct the prompt based on the output of the LLM
prompt = self.prompt_template.format(
output=output,
criteria=criteria
)

print("Prompt total: ", prompt)
# Generate and parse the response from the LLM
response = self.llm_client.generate_string(input=prompt, response_format=FollowsCriteriaResult)

response_dict = json.loads(response)
return score_result.ScoreResult(
name=self.name,
value=response_dict["score"],
reason=response_dict["reason"]
)
8 changes: 8 additions & 0 deletions apps/aifindr-evaluations-runner/requirements.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
fastapi==0.115.8
uvicorn==0.34.0
pydantic==2.10.6
opik==1.4.10
pydantic-settings==2.7.1
python-dotenv==1.0.1
types-requests>=2.31.0.20240311
sseclient-py==1.7.2
20 changes: 20 additions & 0 deletions apps/aifindr-evaluations-runner/settings.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
from pydantic_settings import BaseSettings
from pydantic import field_validator

class Settings(BaseSettings):
OPIK_URL_OVERRIDE: str = "http://host.docker.internal:5173/api"
OPENAI_API_KEY: str = ""
ELLMENTAL_API_URL: str = ""
ELLMENTAL_API_KEY: str = ""

@field_validator("*")
def no_empty_strings(cls, v):
if isinstance(v, str) and not v:
raise ValueError("Field cannot be empty")
return v

class Config:
env_file = ".env"
extra = "ignore" # Permite ignorar variables extra

settings = Settings()
97 changes: 97 additions & 0 deletions apps/aifindr-evaluations-runner/workflows.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
import logging
import time
import json
import requests
from settings import settings
from sseclient import SSEClient
from pydantic import BaseModel
from typing import Optional, List, Any

MAX_RETRIES = 3
RETRIEVAL_EVENT_ID_PREFIX = "similarity_search_by_text"
LLM_EVENT_ID_PREFIX = "llm"

logger = logging.getLogger(__name__)

class WorkflowResponse(BaseModel):
context: Optional[List[Any]] = None
response: str = ""


def run_workflow(workflow: str, query: str) -> WorkflowResponse:
"""
Executes a workflow with the given query and handles retries.

Args:
workflow: The workflow identifier/path
query: The query to process

Returns:
WorkflowResponse: The processed response containing retrieval and LLM responses
"""
retry_count = 0
while retry_count < MAX_RETRIES:
try:
return _make_workflow_request(workflow, query)
except Exception as e:
wait_time = 0.5 * (retry_count + 1) # Increasing delay between retries
print(f"Request failed with error: {e}. Waiting {wait_time}s before retrying... ({retry_count + 1}/{MAX_RETRIES})")
retry_count += 1
time.sleep(wait_time)

raise Exception(f"Failed to complete request after {MAX_RETRIES} retries")


def _make_workflow_request(workflow: str, query: str) -> WorkflowResponse:
"""
Makes a POST request to the Ellmental API and processes SSE responses.

Args:
workflow: The workflow identifier/path
query: The query to process

Returns:
WorkflowResponse: A model containing the retrieval response and concatenated LLM responses

Raises:
requests.exceptions.RequestException: If the request fails
"""
logger.info(f"Running workflow: {workflow} with query: {query}")
response = requests.post(
f"{settings.ELLMENTAL_API_URL}{workflow}",
stream=True,
headers={
"Authorization": f"Bearer {settings.ELLMENTAL_API_KEY}",
"Content-Type": "application/json",
"Accept": "text/event-stream"
},
json={
"query": query,
"stream": "true"
},
)

if not response.ok:
raise requests.exceptions.RequestException(f"Error calling Ellmental API: {response.text}")

client = SSEClient(response)
retrieval_response = None
llm_response = ''

for event in client.events():
if not event.data:
continue

try:
data = json.loads(event.data)
if event.id.startswith(RETRIEVAL_EVENT_ID_PREFIX):
retrieval_response = data['response']['hits']
elif event.id.startswith(LLM_EVENT_ID_PREFIX) and 'content' in data['delta']['message']:
llm_response += data['delta']['message']['content']
except json.JSONDecodeError as e:
print(f"Failed to parse event data: {event.data}. Error: {e}")

return WorkflowResponse(
context=retrieval_response,
response=llm_response
)
5 changes: 5 additions & 0 deletions deployment/docker-compose/docker-compose.override.yaml
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
version: '3.8'
services:
mysql:
ports:
Expand All @@ -24,3 +25,7 @@ services:
frontend:
ports:
- "5173:5173" # Exposing frontend server port to host

aifindr-evaluations-runner:
env_file:
- ../../apps/aifindr-evaluations-runner/.env
8 changes: 8 additions & 0 deletions deployment/docker-compose/docker-compose.yaml
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
name: opik
version: '3.8'

services:
mysql:
Expand Down Expand Up @@ -116,6 +117,13 @@ services:
backend:
condition: service_started

aifindr-evaluations-runner:
build:
context: ../../apps/aifindr-evaluations-runner
dockerfile: Dockerfile
ports:
- "8001:8001"

networks:
default:

Expand Down