Skip to content

Commit

Permalink
Refactor Paginated GETs for better UX (#73)
Browse files Browse the repository at this point in the history
* fix: remove opentelemtry code and make deriver workers configurable

* feat(storage) change get route for messages and metamessages to use POST for options

* feat(storage) Change remaining paginated gets to POST requests with options
  • Loading branch information
VVoruganti authored Oct 18, 2024
1 parent a928ede commit 7efd7c5
Show file tree
Hide file tree
Showing 23 changed files with 287 additions and 3,407 deletions.
20 changes: 3 additions & 17 deletions .env.template
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ OPENAI_API_KEY=

# Logging

OPENTELEMETRY_ENABLED=false # Set to true to enable OpenTelemetry logging and tracing
SENTRY_ENABLED=false # Set to true to enable Sentry logging and tracing
LOGFIRE_TOKEN= # optional logfire config

Expand All @@ -15,21 +14,8 @@ LOGFIRE_TOKEN= # optional logfire config
USE_AUTH_SERVICE=false
AUTH_SERVICE_URL=http://localhost:8001

## Sentry
# Sentry
SENTRY_DSN=

## Open Telemetry

OTEL_SERVICE_NAME=honcho
OTEL_PYTHON_LOGGING_AUTO_INSTRUMENTATION_ENABLED=true
OTEL_PYTHON_LOG_CORRELATION=true

OTEL_PYTHON_LOG_LEVEL=
OTEL_EXPORTER_OTLP_PROTOCOL=
OTEL_EXPORTER_OTLP_ENDPOINT=
OTEL_EXPORTER_OTLP_HEADERS=
OTEL_RESOURCE_ATTRIBUTES=

DEBUG_LOG_OTEL_TO_PROVIDER=false
DEBUG_LOG_OTEL_TO_CONSOLE=true

# Deriver
DERIVER_WORKERS=1
2,713 changes: 0 additions & 2,713 deletions poetry.lock

This file was deleted.

5 changes: 0 additions & 5 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,6 @@ dependencies = [
"greenlet>=3.0.3",
"psycopg[binary]>=3.1.19",
"httpx>=0.27.0",
"opentelemetry-instrumentation-fastapi>=0.45b0",
"opentelemetry-sdk>=1.24.0",
"opentelemetry-exporter-otlp>=1.24.0",
"opentelemetry-instrumentation-sqlalchemy>=0.45b0",
"opentelemetry-instrumentation-logging>=0.45b0",
"rich>=13.7.1",
"openai>=1.43.0",
"anthropic>=0.36.0",
Expand Down
7 changes: 7 additions & 0 deletions src/crud.py
Original file line number Diff line number Diff line change
Expand Up @@ -319,6 +319,9 @@ async def get_messages(
reverse: Optional[bool] = False,
filter: Optional[dict] = None,
) -> Select:
print("============")
print(filter)
print("============")
stmt = (
select(models.Message)
.join(models.Session, models.Session.public_id == models.Message.session_id)
Expand All @@ -337,6 +340,10 @@ async def get_messages(
else:
stmt = stmt.order_by(models.Message.created_at)

print("============")
print(stmt)
print("============")

return stmt


Expand Down
3 changes: 0 additions & 3 deletions src/deriver/__init__.py

This file was deleted.

6 changes: 4 additions & 2 deletions src/deriver/queue.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
import asyncio
import os
from collections.abc import Sequence
from datetime import datetime, timedelta
from typing import Any, Optional, Sequence
from typing import Any, Optional

import sentry_sdk
from dotenv import load_dotenv
Expand Down Expand Up @@ -154,6 +155,7 @@ async def main():
AsyncioIntegration(),
],
)
semaphore = asyncio.Semaphore(2) # Limit to 5 concurrent dequeuing operations
workers = int(os.getenv("DERIVER_WORKERS", 1)) + 1
semaphore = asyncio.Semaphore(workers) # Limit to 5 concurrent dequeuing operations
queue_empty_flag = asyncio.Event() # Event to signal when the queue is empty
await polling_loop(semaphore, queue_empty_flag)
171 changes: 0 additions & 171 deletions src/main.py
Original file line number Diff line number Diff line change
@@ -1,40 +1,10 @@
import json
import logging
import os
from contextlib import asynccontextmanager

import sentry_sdk
from fastapi import APIRouter, FastAPI
from fastapi.middleware.cors import CORSMiddleware
from fastapi.responses import PlainTextResponse
from fastapi_pagination import add_pagination
from opentelemetry import trace
from opentelemetry._logs import (
set_logger_provider,
)
from opentelemetry.exporter.otlp.proto.http._log_exporter import (
OTLPLogExporter,
)
from opentelemetry.exporter.otlp.proto.http.trace_exporter import (
OTLPSpanExporter,
)
from opentelemetry.instrumentation.fastapi import FastAPIInstrumentor
from opentelemetry.instrumentation.logging import LoggingInstrumentor
from opentelemetry.instrumentation.sqlalchemy import SQLAlchemyInstrumentor
from opentelemetry.sdk._logs import LoggerProvider, LoggingHandler
from opentelemetry.sdk._logs.export import (
BatchLogRecordProcessor,
ConsoleLogExporter,
SimpleLogRecordProcessor,
)
from opentelemetry.sdk.resources import Resource
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import (
BatchSpanProcessor,
ConsoleSpanExporter,
SimpleSpanProcessor,
)
from starlette.exceptions import HTTPException as StarletteHTTPException

from src.routers import (
apps,
Expand All @@ -48,127 +18,6 @@

from .db import engine, scaffold_db

# Otel Setup

DEBUG_LOG_OTEL_TO_PROVIDER = (
os.getenv("DEBUG_LOG_OTEL_TO_PROVIDER", "False").lower() == "true"
)
DEBUG_LOG_OTEL_TO_CONSOLE = (
os.getenv("DEBUG_LOG_OTEL_TO_CONSOLE", "False").lower() == "true"
)


def otel_get_env_vars():
otel_http_headers = {}
try:
decoded_http_headers = os.getenv("OTEL_EXPORTER_OTLP_HEADERS", "")
key_values = decoded_http_headers.split(",")
for key_value in key_values:
key, value = key_value.split("=")
otel_http_headers[key] = value

except Exception as e:
print(f"Error parsing OTEL_ENDPOINT_HTTP_HEADERS: {str(e)}")
otel_endpoint_url = os.getenv("OTEL_EXPORTER_OTLP_ENDPOINT", None)

return otel_endpoint_url, otel_http_headers


def otel_trace_init():
trace.set_tracer_provider(
TracerProvider(
resource=Resource.create({}),
),
)
if DEBUG_LOG_OTEL_TO_PROVIDER:
otel_endpoint_url, otel_http_headers = otel_get_env_vars()
if otel_endpoint_url is not None:
otel_endpoint_url = otel_endpoint_url + "/v1/traces"
otlp_span_exporter = OTLPSpanExporter(
endpoint=otel_endpoint_url, headers=otel_http_headers
)
trace.get_tracer_provider().add_span_processor( # type: ignore
BatchSpanProcessor(otlp_span_exporter)
)
if DEBUG_LOG_OTEL_TO_CONSOLE:
trace.get_tracer_provider().add_span_processor( # type: ignore
SimpleSpanProcessor(ConsoleSpanExporter())
)


def otel_logging_init():
# ------------Logging
# Set logging level
# CRITICAL = 50
# ERROR = 40
# WARNING = 30
# INFO = 20
# DEBUG = 10
# NOTSET = 0
# default = WARNING
log_level = str(os.getenv("OTEL_PYTHON_LOG_LEVEL", "INFO")).upper()
if log_level == "CRITICAL":
log_level = logging.CRITICAL
print(f"Using log level: CRITICAL / {log_level}")
elif log_level == "ERROR":
log_level = logging.ERROR
print(f"Using log level: ERROR / {log_level}")
elif log_level == "WARNING":
log_level = logging.WARNING
print(f"Using log level: WARNING / {log_level}")
elif log_level == "INFO":
log_level = logging.INFO
print(f"Using log level: INFO / {log_level}")
elif log_level == "DEBUG":
log_level = logging.DEBUG
print(f"Using log level: DEBUG / {log_level}")
elif log_level == "NOTSET":
log_level = logging.INFO
print(f"Using log level: NOTSET / {log_level}")
# ------------ Opentelemetry logging initialization

logger_provider = LoggerProvider(resource=Resource.create({}))
set_logger_provider(logger_provider)
if DEBUG_LOG_OTEL_TO_CONSOLE:
console_log_exporter = ConsoleLogExporter()
logger_provider.add_log_record_processor(
SimpleLogRecordProcessor(console_log_exporter)
)
if DEBUG_LOG_OTEL_TO_PROVIDER:
otel_endpoint_url, otel_http_headers = otel_get_env_vars()
if otel_endpoint_url is not None:
otel_endpoint_url = otel_endpoint_url + "/v1/logs"
otlp_log_exporter = OTLPLogExporter(
endpoint=otel_endpoint_url, headers=otel_http_headers
)
logger_provider.add_log_record_processor(
BatchLogRecordProcessor(otlp_log_exporter)
)

# otel_log_handler = FormattedLoggingHandler(logger_provider=logger_provider)
otel_log_handler = LoggingHandler(
level=logging.NOTSET, logger_provider=logger_provider
)

otel_log_handler.setLevel(log_level)
# This has to be called first before logger.getLogger().addHandler() so that it can call logging.basicConfig first to set the logging format
# based on the environment variable OTEL_PYTHON_LOG_FORMAT
LoggingInstrumentor(log_level=log_level).instrument(log_level=log_level)
# logFormatter = logging.Formatter(os.getenv("OTEL_PYTHON_LOG_FORMAT", None))
# otel_log_handler.setFormatter(logFormatter)
logging.getLogger().addHandler(otel_log_handler)


OPENTELEMTRY_ENABLED = os.getenv("OPENTELEMETRY_ENABLED", "False").lower() == "true"

# Instrument SQLAlchemy
if OPENTELEMTRY_ENABLED:
otel_trace_init()
otel_logging_init()

SQLAlchemyInstrumentor().instrument(engine=engine.sync_engine)


# Sentry Setup

SENTRY_ENABLED = os.getenv("SENTRY_ENABLED", "False").lower() == "true"
Expand Down Expand Up @@ -221,30 +70,10 @@ async def lifespan(app: FastAPI):
allow_headers=["*"],
)

if OPENTELEMTRY_ENABLED:
FastAPIInstrumentor().instrument_app(app)

router = APIRouter(prefix="/apps/{app_id}/users/{user_id}")

add_pagination(app)


@app.exception_handler(StarletteHTTPException)
async def http_exception_handler(request, exc):
current_span = trace.get_current_span()
if (current_span is not None) and (current_span.is_recording()):
current_span.set_attributes(
{
"http.status_text": str(exc.detail),
"otel.status_description": f"{exc.status_code} / {str(exc.detail)}",
"otel.status_code": "ERROR",
}
)
return PlainTextResponse(
json.dumps({"detail": str(exc.detail)}), status_code=exc.status_code
)


app.include_router(apps.router)
app.include_router(users.router)
app.include_router(sessions.router)
Expand Down
22 changes: 0 additions & 22 deletions src/routers/apps.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,28 +59,6 @@ async def create_app(app: schemas.AppCreate, db=db):
schemas.App: Created App object
"""
# USE_AUTH_SERVICE = os.getenv("USE_AUTH_SERVICE", "False").lower() == "true"
# if USE_AUTH_SERVICE:
# AUTH_SERVICE_URL = os.getenv("AUTH_SERVICE_URL", "http://localhost:8001")
# authorization: Optional[str] = request.headers.get("Authorization")
# if authorization:
# scheme, _, token = authorization.partition(" ")
# if token is not None:
# honcho_app = await crud.create_app(db, app=app)
# # if token == "default":
# # return honcho_app
# res = httpx.put(
# f"{AUTH_SERVICE_URL}/organizations",
# json={
# "id": str(honcho_app.id),
# "name": honcho_app.name,
# "token": token,
# },
# )
# data = res.json()
# if data:
# return honcho_app
# else:
try:
honcho_app = await crud.create_app(db, app=app)
return honcho_app
Expand Down
10 changes: 3 additions & 7 deletions src/routers/collections.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
import json
from typing import Optional

from fastapi import APIRouter, Depends, HTTPException
Expand All @@ -16,12 +15,12 @@
)


@router.get("", response_model=Page[schemas.Collection])
@router.post("/list", response_model=Page[schemas.Collection])
async def get_collections(
app_id: str,
user_id: str,
options: schemas.CollectionGet,
reverse: Optional[bool] = False,
filter: Optional[str] = None,
db=db,
):
"""Get All Collections for a User
Expand All @@ -35,13 +34,10 @@ async def get_collections(
list[schemas.Collection]: List of Collection objects
"""
data = None
if filter is not None:
data = json.loads(filter)
return await paginate(
db,
await crud.get_collections(
db, app_id=app_id, user_id=user_id, filter=data, reverse=reverse
db, app_id=app_id, user_id=user_id, filter=options.filter, reverse=reverse
),
)

Expand Down
15 changes: 6 additions & 9 deletions src/routers/documents.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import json
from typing import Optional, Sequence
from collections.abc import Sequence
from typing import Optional

from fastapi import APIRouter, Depends, HTTPException, Request
from fastapi import APIRouter, Depends, HTTPException
from fastapi_pagination import Page
from fastapi_pagination.ext.sqlalchemy import paginate

Expand All @@ -16,27 +16,24 @@
)


@router.get("", response_model=Page[schemas.Document])
@router.post("/list", response_model=Page[schemas.Document])
async def get_documents(
app_id: str,
user_id: str,
collection_id: str,
options: schemas.DocumentGet,
reverse: Optional[bool] = False,
filter: Optional[str] = None,
db=db,
):
try:
data = None
if filter is not None:
data = json.loads(filter)
return await paginate(
db,
await crud.get_documents(
db,
app_id=app_id,
user_id=user_id,
collection_id=collection_id,
filter=data,
filter=options.filter,
reverse=reverse,
),
)
Expand Down
Loading

0 comments on commit 7efd7c5

Please sign in to comment.