Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Extend the network client #1269

Merged
merged 35 commits into from
Aug 23, 2024
Merged
Show file tree
Hide file tree
Changes from 23 commits
Commits
Show all changes
35 commits
Select commit Hold shift + click to select a range
2927a28
add integration test for client
MehmedGIT Aug 6, 2024
8e7cd3e
fix the test dir path in docker
MehmedGIT Aug 6, 2024
bd16dd7
update network client
MehmedGIT Aug 6, 2024
b2c0675
integration test for client
MehmedGIT Aug 6, 2024
db6e566
Fix flag typo
MehmedGIT Aug 6, 2024
bec81ba
try docker host ip
MehmedGIT Aug 6, 2024
4815896
remove the client server
MehmedGIT Aug 9, 2024
cb3460f
refactor status checks
MehmedGIT Aug 9, 2024
920c1a9
fix test
MehmedGIT Aug 9, 2024
2a843a8
fix: client processing request
MehmedGIT Aug 9, 2024
3a238a7
add: client workflow run
MehmedGIT Aug 9, 2024
50794f9
add timeout and wait to configs
MehmedGIT Aug 9, 2024
cc06fc3
Update src/ocrd_network/client_utils.py
MehmedGIT Aug 12, 2024
4115937
refine status check methods
MehmedGIT Aug 12, 2024
0136db0
add help for new env
MehmedGIT Aug 12, 2024
734bbf0
add cli job status check
MehmedGIT Aug 13, 2024
f86bc23
add: help section to the cli
MehmedGIT Aug 13, 2024
4194f9f
fix: required job id
MehmedGIT Aug 13, 2024
97b3eea
add docstring to cli commands
MehmedGIT Aug 13, 2024
8e7ba26
Fix: rename to block
MehmedGIT Aug 13, 2024
69808b6
Fix: server_utils.py > 404 to 400
MehmedGIT Aug 13, 2024
4de1e83
fix: set ps address if None in constructor
MehmedGIT Aug 13, 2024
d1af85b
fix: check report validation outside try block
MehmedGIT Aug 13, 2024
50f73c5
fix: the annoying string dict
MehmedGIT Aug 13, 2024
8f2861c
add: parameter_override
MehmedGIT Aug 13, 2024
06a371c
add sort to network agents
MehmedGIT Aug 13, 2024
4d85970
add: discovery cli, processors and processor
MehmedGIT Aug 13, 2024
bb3007d
add: check processing job log file
MehmedGIT Aug 13, 2024
ff4243f
fix: exception handling
MehmedGIT Aug 14, 2024
5f746c1
ocrd network client: parse parameters and overrides
kba Aug 14, 2024
8fc8bff
fix parameter parsing again
kba Aug 14, 2024
d73cfaa
Merge pull request #1270 from OCR-D/fix-parsing
MehmedGIT Aug 20, 2024
15cea57
:memo: changelog
kba Aug 22, 2024
18d743a
Merge branch 'master' into extend-network-client
kba Aug 22, 2024
6608539
refactor client cli: process -> run
MehmedGIT Aug 23, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -286,7 +286,7 @@ network-module-test: assets
INTEGRATION_TEST_IN_DOCKER = docker exec core_test
network-integration-test:
$(DOCKER_COMPOSE) --file tests/network/docker-compose.yml up -d
-$(INTEGRATION_TEST_IN_DOCKER) pytest -k 'test_integration_' -v --ignore-glob="$(TESTDIR)/network/*ocrd_all*.py"
-$(INTEGRATION_TEST_IN_DOCKER) pytest -k 'test_integration_' -v --ignore-glob="tests/network/*ocrd_all*.py"
kba marked this conversation as resolved.
Show resolved Hide resolved
$(DOCKER_COMPOSE) --file tests/network/docker-compose.yml down --remove-orphans

network-integration-test-cicd:
Expand Down
4 changes: 4 additions & 0 deletions src/ocrd/cli/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,10 @@
\b
{config.describe('OCRD_MAX_PROCESSOR_CACHE')}
\b
{config.describe('OCRD_NETWORK_CLIENT_POLLING_SLEEP')}
\b
{config.describe('OCRD_NETWORK_CLIENT_POLLING_TIMEOUT')}
\b
{config.describe('OCRD_NETWORK_SERVER_ADDR_PROCESSING')}
\b
{config.describe('OCRD_NETWORK_SERVER_ADDR_WORKFLOW')}
Expand Down
108 changes: 82 additions & 26 deletions src/ocrd_network/cli/client.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
import click
from json import dumps, loads
from typing import Optional

from ocrd.decorators import parameter_option
from ocrd_network import Client
from ocrd_utils import DEFAULT_METS_BASENAME
from ..client import Client


@click.group('client')
Expand Down Expand Up @@ -31,9 +31,24 @@ def processing_cli():
pass


@processing_cli.command('check-status')
@click.option('--address', help='The address of the Processing Server. If not provided, '
'the "OCRD_NETWORK_SERVER_ADDR_PROCESSING" env variable is used by default')
@click.option('-j', '--processing-job-id', required=True)
def check_processing_job_status(address: Optional[str], processing_job_id: str):
"""
Check the status of a previously submitted processing job.
"""
client = Client(server_addr_processing=address)
job_status = client.check_job_status(processing_job_id)
assert job_status
print(f"Processing job status: {job_status}")


@processing_cli.command('processor')
kba marked this conversation as resolved.
Show resolved Hide resolved
@click.argument('processor_name', required=True, type=click.STRING)
@click.option('--address')
@click.option('--address', help='The address of the Processing Server. If not provided, '
'the "OCRD_NETWORK_SERVER_ADDR_PROCESSING" env variable is used by default')
@click.option('-m', '--mets', required=True, default=DEFAULT_METS_BASENAME)
@click.option('-I', '--input-file-grp', default='OCR-D-INPUT')
@click.option('-O', '--output-file-grp', default='OCR-D-OUTPUT')
Expand All @@ -42,26 +57,32 @@ def processing_cli():
@click.option('--result-queue-name')
@click.option('--callback-url')
@click.option('--agent-type', default='worker')
def send_processing_request(
address: Optional[str],
processor_name: str,
mets: str,
input_file_grp: str,
output_file_grp: Optional[str],
page_id: Optional[str],
parameter: Optional[dict],
result_queue_name: Optional[str],
callback_url: Optional[str],
# TODO: This is temporally available to toggle
# between the ProcessingWorker/ProcessorServer
agent_type: Optional[str]
@click.option('-b', '--block', default=False,
help='If set, the client will block till job timeout, fail or success.')
def send_processing_job_request(
address: Optional[str],
processor_name: str,
mets: str,
input_file_grp: str,
output_file_grp: Optional[str],
page_id: Optional[str],
parameter: Optional[dict],
result_queue_name: Optional[str],
callback_url: Optional[str],
# TODO: This is temporally available to toggle
# between the ProcessingWorker/ProcessorServer
agent_type: Optional[str],
block: Optional[bool]
):
"""
Submit a processing job to the processing server.
"""
req_params = {
"path_to_mets": mets,
"description": "OCR-D Network client request",
"input_file_grps": input_file_grp.split(','),
"parameters": parameter if parameter else {},
"agent_type": agent_type,
"agent_type": agent_type
}
if output_file_grp:
req_params["output_file_grps"] = output_file_grp.split(',')
Expand All @@ -71,16 +92,13 @@ def send_processing_request(
req_params["result_queue_name"] = result_queue_name
if callback_url:
req_params["callback_url"] = callback_url

client = Client(
server_addr_processing=address
)
response = client.send_processing_request(
processor_name=processor_name,
req_params=req_params
)
processing_job_id = response.get('job_id', None)
client = Client(server_addr_processing=address)
processing_job_id = client.send_processing_job_request(
processor_name=processor_name, req_params=loads(dumps(req_params)))
assert processing_job_id
print(f"Processing job id: {processing_job_id}")
if block:
client.poll_job_status(job_id=processing_job_id)


@client_cli.group('workflow')
Expand All @@ -91,6 +109,44 @@ def workflow_cli():
pass


@workflow_cli.command('check-status')
@click.option('--address', help='The address of the Processing Server. If not provided, '
'the "OCRD_NETWORK_SERVER_ADDR_PROCESSING" env variable is used by default')
@click.option('-j', '--workflow-job-id', required=True)
def check_workflow_job_status(address: Optional[str], workflow_job_id: str):
"""
Check the status of a previously submitted workflow job.
"""
client = Client(server_addr_processing=address)
job_status = client.check_workflow_status(workflow_job_id)
assert job_status
print(f"Workflow job status: {job_status}")


@workflow_cli.command('run')
@click.option('--address', help='The address of the Processing Server. If not provided, '
'the "OCRD_NETWORK_SERVER_ADDR_PROCESSING" env variable is used by default')
@click.option('-m', '--path-to-mets', required=True)
@click.option('-w', '--path-to-workflow', required=True)
@click.option('-b', '--block', default=False,
help='If set, the client will block till job timeout, fail or success.')
def send_workflow_job_request(
address: Optional[str],
path_to_mets: str,
path_to_workflow: str,
block: Optional[bool]
):
"""
Submit a workflow job to the processing server.
"""
client = Client(server_addr_processing=address)
workflow_job_id = client.send_workflow_job_request(path_to_wf=path_to_workflow, path_to_mets=path_to_mets)
assert workflow_job_id
print(f"Workflow job id: {workflow_job_id}")
if block:
client.poll_workflow_status(job_id=workflow_job_id)


@client_cli.group('workspace')
def workspace_cli():
"""
Expand Down
66 changes: 39 additions & 27 deletions src/ocrd_network/client.py
Original file line number Diff line number Diff line change
@@ -1,37 +1,49 @@
from json import dumps, loads
from requests import post as requests_post
from ocrd_utils import config, getLogger, LOG_FORMAT
from .client_utils import (
get_ps_processing_job_status,
get_ps_workflow_job_status,
poll_job_status_till_timeout_fail_or_success,
poll_wf_status_till_timeout_fail_or_success,
post_ps_processing_request,
post_ps_workflow_request,
verify_server_protocol
)

from .constants import NETWORK_PROTOCOLS


# TODO: This is just a conceptual implementation and first try to
# trigger further discussions on how this should look like.
class Client:
def __init__(
self,
server_addr_processing: str = config.OCRD_NETWORK_SERVER_ADDR_PROCESSING,
server_addr_workflow: str = config.OCRD_NETWORK_SERVER_ADDR_WORKFLOW,
server_addr_workspace: str = config.OCRD_NETWORK_SERVER_ADDR_WORKSPACE
server_addr_processing: str,
kba marked this conversation as resolved.
Show resolved Hide resolved
timeout: int = config.OCRD_NETWORK_CLIENT_POLLING_TIMEOUT,
wait: int = config.OCRD_NETWORK_CLIENT_POLLING_SLEEP
):
self.log = getLogger(f"ocrd_network.client")
if not server_addr_processing:
server_addr_processing = config.OCRD_NETWORK_SERVER_ADDR_PROCESSING
self.server_addr_processing = server_addr_processing
self.server_addr_workflow = server_addr_workflow
self.server_addr_workspace = server_addr_workspace

def send_processing_request(self, processor_name: str, req_params: dict):
verify_server_protocol(self.server_addr_processing)
req_url = f"{self.server_addr_processing}/processor/{processor_name}"
req_headers = {"Content-Type": "application/json; charset=utf-8"}
req_json = loads(dumps(req_params))
self.log.info(f"Sending processing request to: {req_url}")
self.log.debug(req_json)
response = requests_post(url=req_url, headers=req_headers, json=req_json)
return response.json()


def verify_server_protocol(address: str):
for protocol in NETWORK_PROTOCOLS:
if address.startswith(protocol):
return
raise ValueError(f"Wrong/Missing protocol in the server address: {address}, must be one of: {NETWORK_PROTOCOLS}")
self.polling_timeout = timeout
self.polling_wait = wait
self.polling_tries = int(timeout/wait)

def check_job_status(self, job_id: str):
return get_ps_processing_job_status(self.server_addr_processing, processing_job_id=job_id)

def check_workflow_status(self, workflow_job_id: str):
return get_ps_workflow_job_status(self.server_addr_processing, workflow_job_id=workflow_job_id)

def poll_job_status(self, job_id: str) -> str:
return poll_job_status_till_timeout_fail_or_success(
MehmedGIT marked this conversation as resolved.
Show resolved Hide resolved
ps_server_host=self.server_addr_processing, job_id=job_id, tries=self.polling_tries, wait=self.polling_wait)

def poll_workflow_status(self, job_id: str) -> str:
return poll_wf_status_till_timeout_fail_or_success(
ps_server_host=self.server_addr_processing, job_id=job_id, tries=self.polling_tries, wait=self.polling_wait)

def send_processing_job_request(self, processor_name: str, req_params: dict) -> str:
return post_ps_processing_request(
ps_server_host=self.server_addr_processing, processor=processor_name, job_input=req_params)

def send_workflow_job_request(self, path_to_wf: str, path_to_mets: str):
return post_ps_workflow_request(
ps_server_host=self.server_addr_processing, path_to_wf=path_to_wf, path_to_mets=path_to_mets)
81 changes: 81 additions & 0 deletions src/ocrd_network/client_utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
from requests import get as request_get, post as request_post
from time import sleep
from .constants import JobState, NETWORK_PROTOCOLS


def _poll_endpoint_status(ps_server_host: str, job_id: str, job_type: str, tries: int, wait: int):
if job_type not in ["workflow", "processor"]:
raise ValueError(f"Unknown job type '{job_type}', expected 'workflow' or 'processor'")
job_state = JobState.unset
while tries > 0:
sleep(wait)
if job_type == "processor":
job_state = get_ps_processing_job_status(ps_server_host, job_id)
if job_type == "workflow":
job_state = get_ps_workflow_job_status(ps_server_host, job_id)
if job_state == JobState.success or job_state == JobState.failed:
break
tries -= 1
return job_state


def poll_job_status_till_timeout_fail_or_success(ps_server_host: str, job_id: str, tries: int, wait: int) -> JobState:
return _poll_endpoint_status(ps_server_host, job_id, "processor", tries, wait)


def poll_wf_status_till_timeout_fail_or_success(ps_server_host: str, job_id: str, tries: int, wait: int) -> JobState:
return _poll_endpoint_status(ps_server_host, job_id, "workflow", tries, wait)


def get_ps_processing_job_status(ps_server_host: str, processing_job_id: str) -> str:
request_url = f"{ps_server_host}/processor/job/{processing_job_id}"
response = request_get(url=request_url, headers={"accept": "application/json; charset=utf-8"})
assert response.status_code == 200, f"Processing server: {request_url}, {response.status_code}"
job_state = response.json()["state"]
assert job_state
return job_state


def get_ps_workflow_job_status(ps_server_host: str, workflow_job_id: str) -> str:
request_url = f"{ps_server_host}/workflow/job-simple/{workflow_job_id}"
response = request_get(url=request_url, headers={"accept": "application/json; charset=utf-8"})
assert response.status_code == 200, f"Processing server: {request_url}, {response.status_code}"
job_state = response.json()["state"]
assert job_state
return job_state


def post_ps_processing_request(ps_server_host: str, processor: str, job_input: dict) -> str:
request_url = f"{ps_server_host}/processor/run/{processor}"
response = request_post(
url=request_url,
headers={"accept": "application/json; charset=utf-8"},
json=job_input
)
assert response.status_code == 200, f"Processing server: {request_url}, {response.status_code}"
processing_job_id = response.json()["job_id"]
assert processing_job_id
return processing_job_id


# TODO: Can be extended to include other parameters such as page_wise
def post_ps_workflow_request(ps_server_host: str, path_to_wf: str, path_to_mets: str) -> str:
request_url = f"{ps_server_host}/workflow/run?mets_path={path_to_mets}&page_wise=True"
response = request_post(
url=request_url,
headers={"accept": "application/json; charset=utf-8"},
files={"workflow": open(path_to_wf, "rb")}
)
# print(response.json())
# print(response.__dict__)
assert response.status_code == 200, f"Processing server: {request_url}, {response.status_code}"
wf_job_id = response.json()["job_id"]
assert wf_job_id
return wf_job_id


def verify_server_protocol(address: str):
for protocol in NETWORK_PROTOCOLS:
if address.startswith(protocol):
return
raise ValueError(f"Wrong/Missing protocol in the server address: {address}, must be one of: {NETWORK_PROTOCOLS}")
12 changes: 7 additions & 5 deletions src/ocrd_network/server_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,9 @@ def parse_workflow_tasks(logger: Logger, workflow_content: str) -> List[Processo


def raise_http_exception(logger: Logger, status_code: int, message: str, error: Exception = None) -> None:
logger.exception(f"{message} {error}")
if error:
message = f"{message} {error}"
logger.exception(f"{message}")
raise HTTPException(status_code=status_code, detail=message)


Expand All @@ -210,12 +212,12 @@ def validate_job_input(logger: Logger, processor_name: str, ocrd_tool: dict, job
raise_http_exception(logger, status.HTTP_404_NOT_FOUND, message)
try:
report = ParameterValidator(ocrd_tool).validate(dict(job_input.parameters))
if not report.is_valid:
message = f"Failed to validate processing job input against the tool json of processor: {processor_name}\n"
raise_http_exception(logger, status.HTTP_404_BAD_REQUEST, message + report.errors)
except Exception as error:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When would this exception be raised? Validators should not raise errors but return a report. If you don't have a specific use case, it might be better to just not do try/except so that potential errors are actually raised and fixed.

Copy link
Contributor Author

@MehmedGIT MehmedGIT Aug 14, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wanted to be extra secure against unexpected errors. The workers or the server (network agents as a service) should ideally never crash due to some error. Especially with an HTTP 500 error on the client side and leaving the network agent in an unpredictable state. The exceptions from problematic requests are still logged.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Understood, but errors in

report = ParameterValidator(ocrd_tool).validate(dict(job_input.parameters))

would mean that something is broken in our implementation or in jsonschema. So this would likely not be a fluke that happens once, we would need to stop processing and fix the bug at the source. The only variable factor is the job_input.parameters dict and if user-provided input breaks the validator - which it must absolutely never do - we should fix that in the validator.

Copy link
Contributor Author

@MehmedGIT MehmedGIT Aug 23, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The only variable factor is the job_input.parameters dict and if user-provided input breaks the validator - which it must absolutely never do - we should fix that in the validator.

A failing validator would also probably fail most of the processing/workflow jobs. Input from the user breaking down the entire infrastructure is conceptually wrong to me. Even if all the processing jobs fail, the Processing Server should still respond to other requests such as checking logs of old jobs. We would rather need a mechanism to prevent further submission of processing jobs in case X amount of jobs fail in a row - potentially preventing further requests only to the processing endpoint till it is fixed.

There is also currently no graceful shutdown for the processing server. I.e., once the server dies, anything inside the internal processing cache of the server (not the RabbitMQ) will be lost.

message = f"Failed to validate processing job input against the ocrd tool json of processor: {processor_name}"
raise_http_exception(logger, status.HTTP_404_BAD_REQUEST, message, error)
raise_http_exception(logger, status.HTTP_400_BAD_REQUEST, message, error)
if report and not report.is_valid:
message = f"Failed to validate processing job input against the tool json of processor: {processor_name}\n"
raise_http_exception(logger, status.HTTP_400_BAD_REQUEST, f"{message}{report.errors}")


def validate_workflow(logger: Logger, workflow: str) -> None:
Expand Down
10 changes: 10 additions & 0 deletions src/ocrd_utils/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,16 @@ def _ocrd_download_timeout_parser(val):
description="Default address of Processing Server to connect to (for `ocrd network client processing`).",
default=(True, ''))

config.add("OCRD_NETWORK_CLIENT_POLLING_SLEEP",
description="How many seconds to sleep before trying again.",
parser=int,
default=(True, 30))

config.add("OCRD_NETWORK_CLIENT_POLLING_TIMEOUT",
description="Timeout for a blocking ocrd network client (in seconds).",
parser=int,
default=(True, 3600))

config.add("OCRD_NETWORK_SERVER_ADDR_WORKFLOW",
description="Default address of Workflow Server to connect to (for `ocrd network client workflow`).",
default=(True, ''))
Expand Down
14 changes: 14 additions & 0 deletions tests/network/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,20 @@
parser=_ocrd_download_timeout_parser
)

test_config.add(
"OCRD_NETWORK_CLIENT_POLLING_SLEEP",
description="How many seconds to sleep before trying again.",
parser=int,
default=(True, 30)
)

test_config.add(
"OCRD_NETWORK_CLIENT_POLLING_TIMEOUT",
description="Timeout for a blocking ocrd network client (in seconds).",
parser=int,
default=(True, 3600)
)

test_config.add(
name="OCRD_NETWORK_SERVER_ADDR_PROCESSING",
description="Default address of Processing Server to connect to (for `ocrd network client processing`).",
Expand Down
Loading