Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Stop browsers that weren't accessed in a long time. #41

Open
wants to merge 5 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ COPY pyproject.toml /usr/local/ocrd-monitor/
COPY noxfile.py /usr/local/ocrd-monitor/
COPY tests /usr/local/ocrd-monitor/tests

RUN curl -sSL https://pdm.fming.dev/install-pdm.py | python3 -
RUN pip3 install pdm
ENV PATH="/root/.local/bin:${PATH}"

WORKDIR /usr/local/ocrd-monitor
Expand Down
1 change: 1 addition & 0 deletions init.sh
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ fi

export MONITOR_DB_CONNECTION_STRING=$MONITOR_DB_CONNECTION
export OCRD_BROWSER__MODE=native
export OCRD_BROWSER__TIMEOUT="P0DT1H0M0S"
export OCRD_BROWSER__WORKSPACE_DIR=/data/ocr-d
export OCRD_BROWSER__PORT_RANGE="[9000,9100]"
export OCRD_LOGVIEW__PORT=$MONITOR_PORT_LOG
Expand Down
25 changes: 15 additions & 10 deletions ocrdbrowser/_client.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,14 @@
from __future__ import annotations

import asyncio
import asyncio
import logging

from types import TracebackType
from typing import AsyncContextManager, Type, cast

import httpx
from websockets import client
from websockets.exceptions import ConnectionClosed
from websockets.exceptions import ConnectionClosed, ConnectionClosedError
from websockets.legacy.client import WebSocketClientProtocol
from websockets.typing import Subprotocol

Expand Down Expand Up @@ -51,7 +51,11 @@ async def receive_bytes(self) -> bytes:
return bytes()

return cast(bytes, await self._open_connection.recv())
except ConnectionClosed:
except (
ConnectionClosed,
ConnectionClosedError,
asyncio.exceptions.IncompleteReadError,
):
raise ChannelClosed()

async def send_bytes(self, data: bytes) -> None:
Expand All @@ -60,26 +64,27 @@ async def send_bytes(self, data: bytes) -> None:
return

await self._open_connection.send(data)
except ConnectionClosed:
except (
ConnectionClosed,
ConnectionClosedError,
asyncio.exceptions.IncompleteReadError,
):
raise ChannelClosed()


class HttpBrowserClient:
def __init__(self, address: str) -> None:
self.address = address

async def get(self, resource: str, retry: bool=True) -> bytes:
async def get(self, resource: str, retry: bool = True) -> bytes:
try:

async with httpx.AsyncClient(
base_url=self.address
) as client:
async with httpx.AsyncClient(base_url=self.address) as client:
response = await client.get(resource)
return response.content
except Exception as ex:
logging.info(f"is instance is {isinstance(ex, httpx.RemoteProtocolError)}")
logging.info(f"retry value is {retry}")
if isinstance(ex, httpx.RemoteProtocolError) and retry :
if isinstance(ex, httpx.RemoteProtocolError) and retry:
await asyncio.sleep(10)
return await self.get(resource, False)

Expand Down
6 changes: 5 additions & 1 deletion ocrdbrowser/_subprocess.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,11 @@ def _try_kill(pid: int) -> None:
try:
os.kill(pid, signal.SIGKILL)
except ProcessLookupError:
logging.warning(f"Could not find process with ID {pid}")
logging.getLogger(__file__).warning(f"Could not find process with ID {pid}")
except RuntimeError as ex:
logging.getLogger(__file__).warning(
f"Got an unexpected runtime error:\n {ex}"
)

def client(self) -> OcrdBrowserClient:
return HttpBrowserClient(self.address())
Expand Down
35 changes: 32 additions & 3 deletions ocrdmonitor/database/_browserprocessrepository.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
from beanie.odm.queries.find import FindMany
from typing import Any, Collection, Mapping
from datetime import datetime
from typing import Any, Callable, Collection, Mapping

import pymongo
from beanie import Document
from beanie.odm.queries.find import FindMany

from ocrdbrowser import OcrdBrowser
from ocrdmonitor.protocols import BrowserRestoringFactory
Expand All @@ -12,6 +14,7 @@ class BrowserProcess(Document):
owner: str
process_id: str
workspace: str
last_access_time: datetime

class Settings:
indexes = [
Expand All @@ -25,15 +28,19 @@ class Settings:


class MongoBrowserProcessRepository:
def __init__(self, restoring_factory: BrowserRestoringFactory) -> None:
def __init__(
self, restoring_factory: BrowserRestoringFactory, clock: Callable[[], datetime]
) -> None:
self._restoring_factory = restoring_factory
self._clock = clock

async def insert(self, browser: OcrdBrowser) -> None:
await BrowserProcess( # type: ignore
address=browser.address(),
owner=browser.owner(),
process_id=browser.process_id(),
workspace=browser.workspace(),
last_access_time=self._clock(),
).insert()

async def delete(self, browser: OcrdBrowser) -> None:
Expand Down Expand Up @@ -101,6 +108,28 @@ async def first(self, owner: str, workspace: str) -> OcrdBrowser | None:
result.process_id,
)

async def update_access_time(self, browser: OcrdBrowser) -> None:
result = await BrowserProcess.find_one(
BrowserProcess.owner == browser.owner(),
BrowserProcess.workspace == browser.workspace(),
)

if result is None:
return

result.last_access_time = self._clock()
await result.save()

async def browsers_accessed_before(self, time: datetime) -> list[OcrdBrowser]:
return [
self._restoring_factory(
**p.model_dump(exclude={"id", "revision_id", "last_access_time"})
)
for p in await BrowserProcess.find(
BrowserProcess.last_access_time < time
).to_list()
]

async def count(self) -> int:
return await BrowserProcess.count()

Expand Down
3 changes: 2 additions & 1 deletion ocrdmonitor/environment.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
from datetime import datetime
import functools
from typing import Callable, Type

Expand Down Expand Up @@ -33,7 +34,7 @@ async def repositories(self) -> Repositories:
await database.init(self.settings.monitor_db_connection_string)
restoring_factory = RestoringFactories[self.settings.ocrd_browser.mode]
return Repositories(
database.MongoBrowserProcessRepository(restoring_factory),
database.MongoBrowserProcessRepository(restoring_factory, datetime.now),
database.MongoJobRepository(),
)

Expand Down
14 changes: 12 additions & 2 deletions ocrdmonitor/main.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,17 @@
from ocrdmonitor.environment import ProductionEnvironment
from ocrdmonitor.server.settings import Settings

from ocrdmonitor.server import lifespan
from ocrdmonitor.server.app import create_app
from ocrdmonitor.server.lifespan import processtimeout, unreachable_cleanup
from ocrdmonitor.server.settings import Settings


settings = Settings()
environment = ProductionEnvironment(settings)
app = create_app(environment)
app = create_app(
environment,
lifespan.create(
setup=[unreachable_cleanup.clean_unreachable_browsers(environment)],
background=[processtimeout.expiration_loop(environment)],
),
)
6 changes: 6 additions & 0 deletions ocrdmonitor/protocols.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,12 @@ async def find(
async def first(self, *, owner: str, workspace: str) -> OcrdBrowser | None:
...

async def update_access_time(self, browser: OcrdBrowser) -> None:
...

async def browsers_accessed_before(self, time: datetime) -> list[OcrdBrowser]:
...

async def count(self) -> int:
...

Expand Down
7 changes: 4 additions & 3 deletions ocrdmonitor/server/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,10 @@
from fastapi.templating import Jinja2Templates

from ocrdmonitor.protocols import Environment

from ocrdmonitor.server import lifespan
from ocrdmonitor.server.index import create_index
from ocrdmonitor.server.jobs import create_jobs
from ocrdmonitor.server.lifespan import lifespan
from ocrdmonitor.server.logs import create_logs
from ocrdmonitor.server.logview import create_logview
from ocrdmonitor.server.workflows import create_workflows
Expand All @@ -22,8 +23,8 @@
TEMPLATE_DIR = PKG_DIR / "templates"


def create_app(environment: Environment) -> FastAPI:
app = FastAPI(lifespan=lifespan(environment))
def create_app(environment: Environment, lifespan: lifespan.Lifespan) -> FastAPI:
app = FastAPI(lifespan=lifespan)
templates = Jinja2Templates(TEMPLATE_DIR)
app.mount("/static", StaticFiles(directory=STATIC_DIR), name="static")

Expand Down
34 changes: 0 additions & 34 deletions ocrdmonitor/server/lifespan.py

This file was deleted.

36 changes: 36 additions & 0 deletions ocrdmonitor/server/lifespan/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
import asyncio
from contextlib import asynccontextmanager
from typing import (
Any,
AsyncContextManager,
AsyncIterator,
Callable,
Coroutine,
Iterable,
)

from fastapi import FastAPI


Lifespan = Callable[[FastAPI], AsyncContextManager[None]]
Tasks = Iterable[Coroutine[Any, Any, Any]]


def create(setup: Tasks = (), background: Tasks = ()) -> Lifespan:
@asynccontextmanager
async def _lifespan(_: FastAPI) -> AsyncIterator[None]:
# we're using task group here, because asyncio
# only stores weak refs to tasks and cleans them up
# if they aren't referenced anywhere. See:
# https://docs.python.org/3/library/asyncio-task.html#asyncio.create_task
async with asyncio.TaskGroup() as group:
for task in setup:
group.create_task(task)

async with asyncio.TaskGroup() as group:
for task in background:
group.create_task(task)

yield

return _lifespan
41 changes: 41 additions & 0 deletions ocrdmonitor/server/lifespan/processtimeout.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
import asyncio
import logging

from datetime import datetime, timedelta
from typing import Any, Callable, Coroutine

from ocrdmonitor.protocols import BrowserProcessRepository, Environment

logging.getLogger(__file__).setLevel(logging.INFO)

EXPIRATION_LOOP_INTERVAL_SECONDS = 3600


def _never_cancel() -> bool:
return False


async def shutdown_expired(
repository: BrowserProcessRepository,
process_timeout: timedelta,
clock: Callable[[], datetime],
) -> None:
old_processes = await repository.browsers_accessed_before(clock() - process_timeout)
async with asyncio.TaskGroup() as group:
for process in old_processes:
group.create_task(process.stop())
group.create_task(repository.delete(process))


async def expiration_loop(
environment: Environment,
cancellation_fn: Callable[[], bool] = _never_cancel,
loop_interval: int = EXPIRATION_LOOP_INTERVAL_SECONDS,
) -> None:
repository = (await environment.repositories()).browser_processes
while not cancellation_fn():
logging.getLogger(__file__).info("Running expiration loop")
await shutdown_expired(
repository, environment.settings.ocrd_browser.timeout, datetime.now
)
await asyncio.sleep(loop_interval)
19 changes: 19 additions & 0 deletions ocrdmonitor/server/lifespan/unreachable_cleanup.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
import asyncio

from ocrdbrowser import OcrdBrowser
from ocrdmonitor.protocols import BrowserProcessRepository, Environment


async def clean_unreachable_browsers(environment: Environment) -> None:
repo = (await environment.repositories()).browser_processes
all_browsers = await repo.find()
async with asyncio.TaskGroup() as group:
for browser in all_browsers:
group.create_task(ping_or_delete(repo, browser))


async def ping_or_delete(repo: BrowserProcessRepository, browser: OcrdBrowser) -> None:
try:
await browser.client().get("/")
except ConnectionError:
await repo.delete(browser)
6 changes: 3 additions & 3 deletions ocrdmonitor/server/settings.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
from __future__ import annotations
from datetime import timedelta

import functools
import os
Expand Down Expand Up @@ -50,6 +51,7 @@ class OcrdBrowserSettings(BaseSettings):
workspace_dir: Path
mode: Literal["native", "docker"] = "native"
port_range: tuple[int, int]
timeout: timedelta

@field_validator("port_range", mode="before")
@classmethod
Expand Down Expand Up @@ -102,9 +104,7 @@ def getargs(field_name: str, model_type: Type[BaseModel]) -> dict[str, str]:
model_field_name: f"{field_name}__{model_field_name}".upper()
for model_field_name in model_type.model_fields
}
return {
field: os.environ.get(var, "") for field, var in fields_to_env.items()
}
return {field: os.environ.get(var, "") for field, var in fields_to_env.items()}


class OcrdEnvSource(EnvSettingsSource):
Expand Down
Loading