Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Processing server extension (#1046) #1069

Merged
merged 20 commits into from
Sep 4, 2023
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
67 changes: 58 additions & 9 deletions ocrd_network/ocrd_network/database.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
""" The database is used to store information regarding jobs and workspaces.

Jobs: for every process-request a job is inserted into the database with a uuid, status and
Jobs: for every process-request a job is inserted into the database with an uuid, status and
information about the process like parameters and file groups. It is mainly used to track the status
(`ocrd_network.models.job.StateEnum`) of a job so that the state of a job can be queried. Finished
jobs are not deleted from the database.
Expand Down Expand Up @@ -35,18 +35,69 @@ async def sync_initiate_database(db_url: str):
await initiate_database(db_url)


async def db_get_workspace(workspace_id: str) -> DBWorkspace:
workspace = await DBWorkspace.find_one(
DBWorkspace.workspace_id == workspace_id
)
async def db_get_workspace(workspace_id: str = None, workspace_mets_path: str = None) -> DBWorkspace:
workspace = None
if workspace_id:
workspace = await DBWorkspace.find_one(
DBWorkspace.workspace_id == workspace_id
)
if workspace_mets_path:
workspace = await DBWorkspace.find_one(
DBWorkspace.workspace_mets_path == workspace_mets_path
)
if not workspace:
raise ValueError(f'Workspace with id "{workspace_id}" not in the DB.')
return workspace


@call_sync
async def sync_db_get_workspace(workspace_id: str) -> DBWorkspace:
return await db_get_workspace(workspace_id)
async def sync_db_get_workspace(workspace_id: str = None, workspace_mets_path: str = None) -> DBWorkspace:
return await db_get_workspace(workspace_id=workspace_id, workspace_mets_path=workspace_mets_path)


async def db_update_workspace(workspace_id: str = None, workspace_mets_path: str = None, **kwargs):
workspace = None
if workspace_id:
workspace = await DBWorkspace.find_one(
DBWorkspace.workspace_id == workspace_id
)
if workspace_mets_path:
workspace = await DBWorkspace.find_one(
DBWorkspace.workspace_mets_path == workspace_mets_path
)
if not workspace:
raise ValueError(f'Workspace with id "{workspace_id}" not in the DB.')

job_keys = list(workspace.__dict__.keys())
for key, value in kwargs.items():
if key not in job_keys:
raise ValueError(f'Field "{key}" is not available.')
if key == 'workspace_id':
workspace.workspace_id = value
elif key == 'workspace_mets_path':
workspace.workspace_mets_path = value
elif key == 'ocrd_identifier':
workspace.ocrd_identifier = value
elif key == 'bagit_profile_identifier':
workspace.bagit_profile_identifier = value
elif key == 'ocrd_base_version_checksum':
workspace.ocrd_base_version_checksum = value
elif key == 'ocrd_mets':
workspace.ocrd_mets = value
elif key == 'bag_info_adds':
workspace.bag_info_adds = value
elif key == 'deleted':
workspace.deleted = value
elif key == 'being_processed':
workspace.being_processed = value
else:
raise ValueError(f'Field "{key}" is not updatable.')
await workspace.save()


@call_sync
async def sync_db_update_workspace(workspace_id: str = None, workspace_mets_path: str = None, **kwargs):
await db_update_workspace(workspace_id=workspace_id, workspace_mets_path=workspace_mets_path, **kwargs)


async def db_get_processing_job(job_id: str) -> DBProcessorJob:
Expand All @@ -68,8 +119,6 @@ async def db_update_processing_job(job_id: str, **kwargs):
if not job:
raise ValueError(f'Processing job with id "{job_id}" not in the DB.')

# TODO: This may not be the best Pythonic way to do it. However, it works!
# There must be a shorter way with Pydantic. Suggest an improvement.
job_keys = list(job.__dict__.keys())
for key, value in kwargs.items():
if key not in job_keys:
Expand Down
9 changes: 9 additions & 0 deletions ocrd_network/ocrd_network/models/job.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,15 @@


class StateEnum(str, Enum):
# The processing job is cached inside the Processing Server requests cache
cached = 'CACHED'
# The processing job is queued inside the RabbitMQ
queued = 'QUEUED'
# Processing job is currently running in a Worker or Processor Server
running = 'RUNNING'
# Processing job finished successfully
success = 'SUCCESS'
# Processing job failed
failed = 'FAILED'


Expand All @@ -28,6 +34,8 @@ class PYJobInput(BaseModel):
# Used to toggle between sending requests to 'worker and 'server',
# i.e., Processing Worker and Processor Server, respectively
agent_type: Optional[str] = 'worker'
# Auto generated by the Processing Server when forwarding to the Processor Server
job_id: Optional[str] = None

class Config:
schema_extra = {
Expand Down Expand Up @@ -67,6 +75,7 @@ class DBProcessorJob(Document):
parameters: Optional[dict]
result_queue_name: Optional[str]
callback_url: Optional[str]
internal_callback_url: Optional[str]
start_time: Optional[datetime]
end_time: Optional[datetime]
exec_time: Optional[str]
Expand Down
3 changes: 3 additions & 0 deletions ocrd_network/ocrd_network/models/workspace.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ class DBWorkspace(Document):
ocrd_mets Ocrd-Mets (optional)
bag_info_adds bag-info.txt can also (optionally) contain additional
key-value-pairs which are saved here
deleted the document is deleted if set, however, the record is still preserved
being_processed whether the workspace is currently used in a workflow execution or not
"""
workspace_id: str
workspace_mets_path: str
Expand All @@ -24,6 +26,7 @@ class DBWorkspace(Document):
ocrd_mets: Optional[str]
bag_info_adds: Optional[dict]
deleted: bool = False
being_processed: bool = False

class Settings:
name = "workspace"
116 changes: 94 additions & 22 deletions ocrd_network/ocrd_network/processing_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import httpx
from typing import Dict, List
import uvicorn
from queue import Queue

from fastapi import FastAPI, status, Request, HTTPException
from fastapi.exceptions import RequestValidationError
Expand All @@ -11,18 +12,26 @@
from pika.exceptions import ChannelClosedByBroker

from ocrd_utils import getLogger
from .database import initiate_database
from .database import (
initiate_database,
db_get_workspace,
db_update_workspace
)
from .deployer import Deployer
from .models import (
DBProcessorJob,
PYJobInput,
PYJobOutput,
StateEnum
)
from .rabbitmq_utils import RMQPublisher, OcrdProcessingMessage
from .rabbitmq_utils import (
RMQPublisher,
OcrdProcessingMessage,
OcrdResultMessage
)
from .server_utils import (
_get_processor_job,
validate_and_resolve_mets_path,
validate_and_return_mets_path,
validate_job_input,
)
from .utils import (
Expand Down Expand Up @@ -69,6 +78,11 @@ def __init__(self, config_path: str, host: str, port: int) -> None:
# Gets assigned when `connect_publisher` is called on the working object
self.rmq_publisher = None

# Used for buffering/caching processing requests in the Processing Server
# Key: `workspace_id` or `path_to_mets` depending on which is provided
# Value: Queue that holds PYInputJob elements
self.processing_requests_cache = {}

# Create routes
self.router.add_api_route(
path='/stop',
Expand Down Expand Up @@ -102,6 +116,15 @@ def __init__(self, config_path: str, host: str, port: int) -> None:
response_model_exclude_none=True
)

self.router.add_api_route(
path='/processor/result_callback/{job_id}',
endpoint=self.remove_from_request_cache,
methods=['POST'],
tags=['processing'],
status_code=status.HTTP_200_OK,
summary='Callback used by a worker or processor server for successful processing of a request',
)

self.router.add_api_route(
path='/processor/{processor_name}',
endpoint=self.get_processor_info,
Expand Down Expand Up @@ -266,16 +289,76 @@ def query_ocrd_tool_json_from_server(self, processor_name):
return ocrd_tool, processor_server_url

async def push_processor_job(self, processor_name: str, data: PYJobInput) -> PYJobOutput:
if data.job_id:
raise HTTPException(
status_code=status.HTTP_422_UNPROCESSABLE_ENTITY,
detail=f"Job id field is set but must not be: {data.job_id}"
)
data.job_id = generate_id() # Generate processing job id

if data.agent_type not in ['worker', 'server']:
raise HTTPException(
status_code=status.HTTP_422_UNPROCESSABLE_ENTITY,
detail=f"Unknown network agent with value: {data.agent_type}"
)
workspace_db = await db_get_workspace(
workspace_id=data.workspace_id,
workspace_mets_path=data.path_to_mets
)
if not workspace_db:
raise HTTPException(
status_code=status.HTTP_422_UNPROCESSABLE_ENTITY,
detail=f"Workspace with id: {data.workspace_id} or path: {data.path_to_mets} not found"
)

# The workspace is currently locked (being processed)
# TODO: Do the proper caching here, after the refactored code is working
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  • Change being_processed in the DBWorkspace from boolean to string
  • Serialize data without page_id in a canonical way, set being_processed to that when locking
  • check here whether canonical serialization of data is the same as being_processed. If so, allow the request, even though workspace_db is being_processed.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Objection by @MehmedGIT in today's call: Processing Server clients (like the Workflow Server) could decide to split up the job into several page ranges (esp. if they know there are multiple worker instances running in the back) and issue them concurrently. So instead of a boolean or string, we would need to synchronise over the actual set of pages of each workspace.

Copy link
Contributor Author

@MehmedGIT MehmedGIT Jul 11, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  • check here whether canonical serialization of data is the same as being_processed. If so, allow the request, even though workspace_db is being_processed.

This is not achieving proper workspace locking when the same request is created with overlapping page ranges for the same workspace.

we would need to synchronise over the actual set of pages of each workspace.

This is the way to achieve proper page range locking to prevent collisions on the processor server/worker level, however, also complex to implement. The more I think about it the more problematic it seems when considering error handling.

  1. Ideal scenario example: 200 pages workspace, the Workflow Server will create 4 processing requests (50 pages per request) when knowing there are 4 workers of the specified processor. Then each request's page_id field will be in the form start_page_id..end_page_id. The being_processed boolean field will be replaced with a str field locked_pages in the form start_page_id1..end_page_id1,...,start_page_id4..end_page_id4. Then detecting overlapping single pages or page ranges is achievable to raise errors for the next coming requests. If the ranges don't overlap and match for the same processor, then the workspace would not be considered locked. For other processors, the workspace will still be considered locked. The unlocking of the workspace for pages will then happen based on the internal callbacks to the Processing Server from the Processor Server or Processing Worker when the execution finishes or fails.

Problem: Still, in the case when either of the two fails to use a callback, say due to a crash, the workspace will be indefinitely locked. Considering a proper timeout for a server or worker is hard.

  1. Another point:
    Considering that page_id could be:
  • a single page - PHYS0001
  • multiple pages separated by a comma - PHYS0003,PHYS0004,PHYS0005
  • a page range... PHYS0009..PHYS0015

And now assuming a potential scenario where 3 processing requests with different page_id formats above are passed - the value of locked_pages for the 3 requests will be PHYS0001_PHYS0003,PHYS0004,PHYS0005_PHYS0009..PHYS0015 assuming that the separator will be an underscore. Or just a string list with values [PHYS0001][PHYS0003,PHYS0004,PHYS0005][PHYS0009..PHYS0015].

Potential problem: locked_pages is filled with a lot of values in case single page requests are passed for the entire workspace. Another thing to consider is collision detection - the incoming page_id can have 3 different forms, and hence, lots of extra overhead of handling data.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. [...] Then detecting overlapping single pages or page ranges is achievable to raise errors for the next coming requests.

Why raise errors? IMO, locking is simply about blocking requests until the associated resources become free again.

If the ranges don't overlap and match for the same processor, then the workspace would not be considered locked. For other processors, the workspace will still be considered locked.

Why make this contingent on the identity of the processor? IMO, if the other request concerns a disjunct set of pages, then there simply cannot be a dependency relation to the currently running processor(s). It could be because the other request is prior (lagging behind) or posterior (more advanced) in the overall workflow across pages. Or it could simply be an independent request. (Conversely, independent requests which do affect the same page range will gain an artificial dependency relation here, but that's not that much of a problem.)

If anything, if we want to look more closely into dependencies, we should consider pairs set(pages) · fileGrp, where

  • a new request must block if its pages and input fileGrp is overlapping the locked pairs, and
  • a new job will lock its pages and output fileGrp while running

Thus, even requests on the same pages but regarding distinct fileGrps would stay independent.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Potential problem: locked_pages is filled with a lot of values

Not a problem: simply use the actual resolved list (or rather, set) of pages! Set operations (disjunction for adding, intersection for testing, difference for clearing) are clear and efficient.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why raise errors? IMO, locking is simply about blocking requests until the associated resources become free again.

Right. I was just considering the cases where double executions of the same processor with overlapping pages are submitted. That would mostly end in error, wouldn't it? Unless overwrite is already set for the next requests.

Why make this contingent on the identity of the processor?

Because the workspace will be unlocked for the same processor requests that potentially can have different page ranges. The collisions for the same processor but different pages are handled by the METS server. However, if we get a different processor but the same pages, then the workspace is locked and that request goes into the waiting internal queue (CACHED). Consider cases when running processors cannot be done independently, i.e., the output of the first processor has to be used by the second processor.

IMO, if the other request concerns a disjunct set of pages, then there simply cannot be a dependency relation to the currently running processor(s)

Right, that means we must track processor · set(pages) · fileGrp (considering the info from the next paragraph). This becomes even harder to track (and potentially debug) what to submit directly in the RabbitMQ and what to queue internally (to cache).

If anything, if we want to look more closely into dependencies, we should consider pairs set(pages) · fileGrp, where
- a new request must block if its pages and input fileGrp is overlapping the locked pairs, and
- a new job will lock its pages and output fileGrp while running
Thus, even requests on the same pages but regarding distinct fileGrps would stay independent.

Oh, right. We should also consider fileGrp in the mix of complexity...

Not a problem: simply use the actual resolved list (or rather, set) of pages! Set operations (disjunction for adding, intersection for testing, difference for clearing) are clear and efficient.

This is of course simpler to manage although there will be more stress on the DB itself with reads/writes - which I guess is okay for now.

So we'll now have cached and queued and running job entries. For running jobs, we agreed there is a need for some kind of timeout anyway, universally

Yes, we have some dummy timeout right now based on the submitted amount of pages (set to 200 by default multiplicated with a timeout value per page). Which could potentially lead to timeout errors even when the output is correctly produced say due to slower processing. The amount of pages is still not part of the processing request.

BTW, IINM we still need to take care of atomicity in some places, i.e. use a mutex to query and change the internal queue...

Yes, there will be a race condition to the internal queue resource.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IMO, if the other request concerns a disjunct set of pages, then there simply cannot be a dependency relation to the currently running processor(s)

Right, that means we must track processor · set(pages) · fileGrp (considering the info from the next paragraph). This becomes even harder to track (and potentially debug) what to submit directly in the RabbitMQ and what to queue internally (to cache).

Sorry to add to the complexity but we also need the parameterization of the processor in the mix, so processor · parameters · set(pages) · fileGrp 😬

Copy link
Collaborator

@bertsky bertsky Jul 12, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was just considering the cases where double executions of the same processor with overlapping pages are submitted. That would mostly end in error, wouldn't it? Unless overwrite is already set for the next requests.

Oh that. Yes, that's right. (If the METS already contains results for a page-fileGrp.) But for that we don't have to do anything (detecting overlaps etc) – this error will be coming from the processor itself (it already contains logic looking for conflicts in the output fileGrp / page list).

Why make this contingent on the identity of the processor?

Because the workspace will be unlocked for the same processor requests that potentially can have different page ranges. The collisions for the same processor but different pages are handled by the METS server. However, if we get a different processor but the same pages, then the workspace is locked and that request goes into the waiting internal queue (CACHED). Consider cases when running processors cannot be done independently, i.e., the output of the first processor has to be used by the second processor.

This sounds confusing to me. My above analysis still stands IMHO – I don't see any reason why we should look at the identity of the processor. (But I agree with @kba that if we do, then it would really have to be the combination of processor and parameters.) It's the just pages and the fileGrps that cause a dependency.

Regarding timeouts (using the actual number of pages in the Processor Server model, adding a timeout mechanism in the Processing Worker) – should we track that in a separate issue?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh that. Yes, that's right. But for that we don't have to do anything

Agree, we should not try to do early prevention of errors since it makes the implementation more complex on the Processing Server side.

It's the just pages and the fileGrps that cause a dependency.

For the sake of keeping it simple - it should be just pages and fileGrp of a workspace then.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Regarding timeouts (using the actual number of pages in the Processor Server model, adding a timeout mechanism in the Processing Worker) – should we track that in a separate issue?

#1074

Agree, we should not try to do early prevention of errors since it makes the implementation more complex on the Processing Server side.

Yes. If we want to do anticipation of workflow conflicts then it would be what currently is done in ocrd process via ocrd.task_sequence.validate_tasks() – checking that

  • all processors exist
  • all parameters are valid for all steps
  • no output fileGrp exists prior to that step (on the data or the workflow itself), except when using --overwrite
  • no input fileGrp will be missing for any step (because already on the data or generated earlier on the workflow)

That could be done in the Workflow Server statically – before sending requests.

if workspace_db.being_processed:
raise HTTPException(
status_code=status.HTTP_422_UNPROCESSABLE_ENTITY,
detail=f"Workspace with id: {data.workspace_id} or "
f"path: {data.path_to_mets} is currently being processed"
)

workspace_key = data.workspace_id if data.workspace_id else data.path_to_mets
# If a record queue of this workspace_id does not exist in the requests cache
if not self.processing_requests_cache.get(workspace_key, None):
self.processing_requests_cache[workspace_key] = Queue()
# Add the processing request to the internal queue
self.processing_requests_cache[workspace_key].put(data)

data = self.processing_requests_cache[workspace_key].get()
# Lock the workspace
await db_update_workspace(
workspace_id=data.workspace_id,
workspace_mets_path=data.path_to_mets,
being_processed=True
)

# Since the path is not resolved yet,
# the return value is not important for the Processing Server
await validate_and_return_mets_path(self.log, data)

# Create a DB entry
job = DBProcessorJob(
**data.dict(exclude_unset=True, exclude_none=True),
processor_name=processor_name,
internal_callback_url=f"/processor/result_callback/{data.job_id}",
state=StateEnum.queued
)
await job.insert()

job_output = None
if data.agent_type == 'worker':
job_output = await self.push_to_processing_queue(processor_name, data)
ocrd_tool = await self.get_processor_info(processor_name)
validate_job_input(self.log, processor_name, ocrd_tool, data)
processing_message = self.create_processing_message(job)
await self.push_to_processing_queue(processor_name, processing_message)
job_output = job.to_job_output()
if data.agent_type == 'server':
job_output = await self.push_to_processor_server(processor_name, data)
ocrd_tool, processor_server_url = self.query_ocrd_tool_json_from_server(processor_name)
validate_job_input(self.log, processor_name, ocrd_tool, data)
job_output = await self.push_to_processor_server(processor_name, processor_server_url, data)
if not job_output:
self.log.exception('Failed to create job output')
raise HTTPException(
Expand All @@ -285,10 +368,7 @@ async def push_processor_job(self, processor_name: str, data: PYJobInput) -> PYJ
return job_output

# TODO: Revisit and remove duplications between push_to_* methods
async def push_to_processing_queue(self, processor_name: str, job_input: PYJobInput) -> PYJobOutput:
ocrd_tool = await self.get_processor_info(processor_name)
validate_job_input(self.log, processor_name, ocrd_tool, job_input)
job_input = await validate_and_resolve_mets_path(self.log, job_input, resolve=False)
async def push_to_processing_queue(self, processor_name: str, processing_message: OcrdProcessingMessage):
if not self.rmq_publisher:
raise Exception('RMQPublisher is not connected')
deployed_processors = self.deployer.find_matching_processors(
Expand All @@ -299,14 +379,6 @@ async def push_to_processing_queue(self, processor_name: str, job_input: PYJobIn
if processor_name not in deployed_processors:
self.check_if_queue_exists(processor_name)

job = DBProcessorJob(
**job_input.dict(exclude_unset=True, exclude_none=True),
job_id=generate_id(),
processor_name=processor_name,
state=StateEnum.queued
)
await job.insert()
processing_message = self.create_processing_message(job)
encoded_processing_message = OcrdProcessingMessage.encode_yml(processing_message)
try:
self.rmq_publisher.publish_to_queue(processor_name, encoded_processing_message)
Expand All @@ -316,12 +388,8 @@ async def push_to_processing_queue(self, processor_name: str, job_input: PYJobIn
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f'RMQPublisher has failed: {error}'
)
return job.to_job_output()

async def push_to_processor_server(self, processor_name: str, job_input: PYJobInput) -> PYJobOutput:
ocrd_tool, processor_server_url = self.query_ocrd_tool_json_from_server(processor_name)
validate_job_input(self.log, processor_name, ocrd_tool, job_input)
job_input = await validate_and_resolve_mets_path(self.log, job_input, resolve=False)
async def push_to_processor_server(self, processor_name: str, processor_server_url: str, job_input: PYJobInput) -> PYJobOutput:
try:
json_data = json.dumps(job_input.dict(exclude_unset=True, exclude_none=True))
except Exception as e:
Expand Down Expand Up @@ -357,6 +425,10 @@ async def push_to_processor_server(self, processor_name: str, job_input: PYJobIn
async def get_processor_job(self, processor_name: str, job_id: str) -> PYJobOutput:
return await _get_processor_job(self.log, processor_name, job_id)

async def remove_from_request_cache(self, processor_name: str, job_id: str, ocrd_result: OcrdResultMessage):
# TODO: Implement, after the refactored code is working
pass

async def get_processor_info(self, processor_name) -> Dict:
""" Return a processor's ocrd-tool.json
"""
Expand Down
Loading