From 4ba360b10c3f5b8cbae515e7ddd506626f9231fc Mon Sep 17 00:00:00 2001 From: Ronnie Dutta <61982285+MetRonnie@users.noreply.github.com> Date: Tue, 28 Jan 2025 16:19:44 +0000 Subject: [PATCH 1/7] Improve server-client communication error handling --- cylc/flow/network/__init__.py | 70 +++++++++++++---- cylc/flow/network/client.py | 70 ++++++++++------- cylc/flow/network/multi.py | 28 +++++-- cylc/flow/network/replier.py | 54 ++++++++----- cylc/flow/network/server.py | 122 +++++++++++++++++------------- cylc/flow/run_modes/dummy.py | 19 +++-- cylc/flow/run_modes/simulation.py | 20 +++-- cylc/flow/run_modes/skip.py | 21 +++-- cylc/flow/task_outputs.py | 9 ++- tests/integration/test_client.py | 35 +++++++++ tests/integration/test_replier.py | 28 ++++--- tests/integration/test_server.py | 66 +++++++++++----- 12 files changed, 377 insertions(+), 165 deletions(-) diff --git a/cylc/flow/network/__init__.py b/cylc/flow/network/__init__.py index 42b79475ca5..882fe9b99d1 100644 --- a/cylc/flow/network/__init__.py +++ b/cylc/flow/network/__init__.py @@ -18,7 +18,12 @@ import asyncio import getpass import json -from typing import Optional, Tuple +from typing import ( + TYPE_CHECKING, + Optional, + Tuple, + Union, +) import zmq import zmq.asyncio @@ -30,34 +35,71 @@ CylcError, CylcVersionError, ServiceFileError, - WorkflowStopped + WorkflowStopped, ) from cylc.flow.hostuserutil import get_fqdn_by_host from cylc.flow.workflow_files import ( ContactFileFields, - KeyType, - KeyOwner, KeyInfo, + KeyOwner, + KeyType, + get_workflow_srv_dir, load_contact_file, - get_workflow_srv_dir ) + +if TYPE_CHECKING: + # BACK COMPAT: typing_extensions.TypedDict + # FROM: Python 3.7 + # TO: Python 3.11 + from typing_extensions import TypedDict + + API = 5 # cylc API version MSG_TIMEOUT = "TIMEOUT" +if TYPE_CHECKING: + class ResponseDict(TypedDict, total=False): + """Structure of server response messages. -def encode_(message): - """Convert the structure holding a message field from JSON to a string.""" - try: - return json.dumps(message) - except TypeError as exc: - return json.dumps({'errors': [{'message': str(exc)}]}) + Confusingly, has similar format to GraphQL execution result. + But if we change this now we could break compatibility for + issuing commands to/receiving responses from workflows running in + different versions of Cylc 8. + """ + data: object + """For most Cylc commands that issue GQL mutations, the data field will + look like: + data: { + : { + result: [ + { + id: , + response: [, ] + }, + ... + ] + } + } + but this is not 100% consistent unfortunately + """ + error: Union[Exception, str, dict] + """If an error occurred that could not be handled. + (usually a dict {message: str, traceback?: str}). + """ + user: str + cylc_version: str + """Server (i.e. running workflow) Cylc version. + + Going forward, we include this so we can more easily handle any future + back-compat issues.""" -def decode_(message): - """Convert an encoded message string to JSON with an added 'user' field.""" +def load_server_response(message: str) -> 'ResponseDict': + """Convert a JSON message string to dict with an added 'user' field.""" msg = json.loads(message) - msg['user'] = getpass.getuser() # assume this is the user + if 'user' not in msg: + msg['user'] = getpass.getuser() # assume this is the user return msg diff --git a/cylc/flow/network/client.py b/cylc/flow/network/client.py index 099ef8bc0ff..20a8afb0552 100644 --- a/cylc/flow/network/client.py +++ b/cylc/flow/network/client.py @@ -15,18 +15,31 @@ # along with this program. If not, see . """Client for workflow runtime API.""" -from abc import ABCMeta, abstractmethod +from abc import ( + ABCMeta, + abstractmethod, +) import asyncio +import json import os from shutil import which import socket import sys -from typing import Any, Optional, Union, Dict +from typing import ( + TYPE_CHECKING, + Any, + Dict, + Optional, + Union, +) import zmq import zmq.asyncio -from cylc.flow import LOG +from cylc.flow import ( + LOG, + __version__ as CYLC_VERSION, +) from cylc.flow.exceptions import ( ClientError, ClientTimeout, @@ -36,16 +49,17 @@ ) from cylc.flow.hostuserutil import get_fqdn_by_host from cylc.flow.network import ( - encode_, - decode_, + ZMQSocketBase, get_location, - ZMQSocketBase + load_server_response, ) from cylc.flow.network.client_factory import CommsMeth from cylc.flow.network.server import PB_METHOD_MAP -from cylc.flow.workflow_files import ( - detect_old_contact_file, -) +from cylc.flow.workflow_files import detect_old_contact_file + + +if TYPE_CHECKING: + from cylc.flow.network import ResponseDict class WorkflowRuntimeClientBase(metaclass=ABCMeta): @@ -270,7 +284,7 @@ async def async_request( args: Optional[Dict[str, Any]] = None, timeout: Optional[float] = None, req_meta: Optional[Dict[str, Any]] = None - ) -> object: + ) -> Union[bytes, object]: """Send an asynchronous request using asyncio. Has the same arguments and return values as ``serial_request``. @@ -292,12 +306,12 @@ async def async_request( if req_meta: msg['meta'].update(req_meta) LOG.debug('zmq:send %s', msg) - message = encode_(msg) + message = json.dumps(msg) self.socket.send_string(message) # receive response if self.poller.poll(timeout): - res = await self.socket.recv() + res: bytes = await self.socket.recv() else: self.timeout_handler() raise ClientTimeout( @@ -307,26 +321,28 @@ async def async_request( ' --comms-timeout option;' '\n* or check the workflow log.' ) + LOG.debug('zmq:recv %s', res) - if msg['command'] in PB_METHOD_MAP: - response = {'data': res} - else: - response = decode_( - res.decode() if isinstance(res, bytes) else res - ) - LOG.debug('zmq:recv %s', response) + if command in PB_METHOD_MAP: + return res + + response: ResponseDict = load_server_response(res.decode()) try: return response['data'] except KeyError: - error = response.get( - 'error', - {'message': f'Received invalid response: {response}'}, - ) - raise ClientError( - error.get('message'), # type: ignore - error.get('traceback'), # type: ignore - ) from None + error = response.get('error') + if not error: + error = ( + f"Received invalid response for Cylc {CYLC_VERSION}: " + f"{response}" + ) + wflow_cylc_ver = response.get('cylc_version') + if wflow_cylc_ver: + error += ( + f"\n(Workflow is running in Cylc {wflow_cylc_ver})" + ) + raise ClientError(str(error)) from None def get_header(self) -> dict: """Return "header" data to attach to each request for traceability. diff --git a/cylc/flow/network/multi.py b/cylc/flow/network/multi.py index 9c190f68799..a61c6d05bd2 100644 --- a/cylc/flow/network/multi.py +++ b/cylc/flow/network/multi.py @@ -16,12 +16,23 @@ import asyncio import sys -from typing import Callable, Dict, List, Tuple, Optional, Union, Type +from typing import ( + Callable, + Dict, + List, + Optional, + Tuple, + Type, + Union, +) from ansimarkup import ansiprint from cylc.flow.async_util import unordered_map -from cylc.flow.exceptions import CylcError, WorkflowStopped +from cylc.flow.exceptions import ( + CylcError, + WorkflowStopped, +) import cylc.flow.flags from cylc.flow.id_cli import parse_ids_async from cylc.flow.terminal import DIM @@ -220,14 +231,15 @@ def _process_response( def _report( - response: dict, + response: Union[dict, list], ) -> Tuple[Optional[str], Optional[str], bool]: """Report the result of a GraphQL operation. This analyses GraphQL mutation responses to determine the outcome. Args: - response: The GraphQL response. + response: The workflow server response (NOT necessarily conforming to + GraphQL execution result spec). Returns: (stdout, stderr, outcome) @@ -235,6 +247,12 @@ def _report( """ try: ret: List[Tuple[Optional[str], Optional[str], bool]] = [] + if not isinstance(response, dict): + if isinstance(response, list) and response[0].get('error'): + # If operating on workflow running in older Cylc version, + # may get a error response like [{'error': '...'}] + raise Exception(response) + raise Exception(f"Unexpected response: {response}") for mutation_response in response.values(): # extract the result of each mutation result in the response success, msg = mutation_response['result'][0]['response'] @@ -268,7 +286,7 @@ def _report( # response returned is not in the expected format - this shouldn't # happen but we need to protect against it err_msg = '' - if cylc.flow.flags.verbosity > 1: # debug mode + if cylc.flow.flags.verbosity > 0: # verbose mode # print the full result to stderr err_msg += f'\n <{DIM}>response={response}' return ( diff --git a/cylc/flow/network/replier.py b/cylc/flow/network/replier.py index 09bfb55f662..98555a1166f 100644 --- a/cylc/flow/network/replier.py +++ b/cylc/flow/network/replier.py @@ -15,15 +15,27 @@ # along with this program. If not, see . """Server for workflow runtime API.""" +import json from queue import Queue -from typing import TYPE_CHECKING, Optional +from typing import ( + TYPE_CHECKING, + Optional, +) import zmq -from cylc.flow import LOG -from cylc.flow.network import encode_, decode_, ZMQSocketBase +from cylc.flow import ( + LOG, + __version__ as CYLC_VERSION, +) +from cylc.flow.network import ( + ZMQSocketBase, + load_server_response, +) + if TYPE_CHECKING: + from cylc.flow.network import ResponseDict from cylc.flow.network.server import WorkflowRuntimeServer @@ -69,7 +81,7 @@ def _bespoke_stop(self) -> None: LOG.debug('stopping zmq replier...') self.queue.put('STOP') - def listener(self): + def listener(self) -> None: """The server main loop, listen for and serve requests. When called, this method will receive and respond until there are no @@ -90,7 +102,9 @@ def listener(self): try: # Check for messages - msg = self.socket.recv_string(zmq.NOBLOCK) + msg = self.socket.recv_string( # type: ignore[union-attr] + zmq.NOBLOCK + ) except zmq.error.Again: # No messages, break to parent loop/caller. break @@ -99,27 +113,27 @@ def listener(self): continue # attempt to decode the message, authenticating the user in the # process + res: ResponseDict + response: bytes try: - message = decode_(msg) + message = load_server_response(msg) except Exception as exc: # purposefully catch generic exception # failed to decode message, possibly resulting from failed # authentication - LOG.exception('failed to decode message: "%s"', exc) - import traceback - response = encode_( - { - 'error': { - 'message': 'failed to decode message: "%s"' % msg, - 'traceback': traceback.format_exc(), - } - } - ).encode() + LOG.exception(exc) + LOG.error(f'failed to decode message: "{msg}"') + res = { + 'error': {'message': str(exc)}, + 'cylc_version': CYLC_VERSION, + } + response = json.dumps(res).encode() else: # success case - serve the request res = self.server.receiver(message) + data = res.get('data') # send back the string to bytes response - if isinstance(res.get('data'), bytes): - response = res['data'] + if isinstance(data, bytes): + response = data else: - response = encode_(res).encode() - self.socket.send(response) + response = json.dumps(res).encode() + self.socket.send(response) # type: ignore[union-attr] diff --git a/cylc/flow/network/server.py b/cylc/flow/network/server.py index 2c170e61198..9a44d46921e 100644 --- a/cylc/flow/network/server.py +++ b/cylc/flow/network/server.py @@ -19,29 +19,46 @@ from queue import Queue from textwrap import dedent from time import sleep -from typing import TYPE_CHECKING, Any, Dict, Iterable, List, Optional, Union +from typing import ( + TYPE_CHECKING, + Any, + Dict, + Iterable, + List, + Optional, + Union, +) from graphql.execution.executors.asyncio import AsyncioExecutor import zmq from zmq.auth.thread import ThreadAuthenticator -from cylc.flow import LOG, workflow_files +from cylc.flow import ( + LOG, + __version__ as CYLC_VERSION, + workflow_files, +) from cylc.flow.cfgspec.glbl_cfg import glbl_cfg +from cylc.flow.data_messages_pb2 import PbEntireWorkflow +from cylc.flow.data_store_mgr import DELTAS_MAP from cylc.flow.network.authorisation import authorise from cylc.flow.network.graphql import ( - CylcGraphQLBackend, IgnoreFieldMiddleware, instantiate_middleware + CylcGraphQLBackend, + IgnoreFieldMiddleware, + instantiate_middleware, ) from cylc.flow.network.publisher import WorkflowPublisher from cylc.flow.network.replier import WorkflowReplier from cylc.flow.network.resolvers import Resolvers from cylc.flow.network.schema import schema -from cylc.flow.data_store_mgr import DELTAS_MAP -from cylc.flow.data_messages_pb2 import PbEntireWorkflow + if TYPE_CHECKING: - from cylc.flow.scheduler import Scheduler from graphql.execution import ExecutionResult + from cylc.flow.network import ResponseDict + from cylc.flow.scheduler import Scheduler + # maps server methods to the protobuf message (for client/UIS import) PB_METHOD_MAP: Dict[str, Any] = { @@ -267,7 +284,7 @@ async def publish_queued_items(self) -> None: articles = self.publish_queue.get() await self.publisher.publish(*articles) - def receiver(self, message): + def receiver(self, message) -> 'ResponseDict': """Process incoming messages and coordinate response. Wrap incoming messages, dispatch them to exposed methods and/or @@ -285,26 +302,44 @@ def receiver(self, message): args.update({'user': message['user']}) if 'meta' in message: args['meta'] = message['meta'] - except KeyError: + except KeyError as exc: # malformed message - return {'error': { - 'message': 'Request missing required field(s).'}} + return { + 'error': { + 'message': ( + f"Request missing field {exc} required for " + f"Cylc {CYLC_VERSION}" + ) + }, + 'cylc_version': CYLC_VERSION, + } except AttributeError: # no exposed method by that name - return {'error': { - 'message': 'No method by the name "%s"' % message['command']}} + return { + 'error': { + 'message': ( + f"No method by the name '{message['command']}' " + f"at Cylc {CYLC_VERSION}" + ) + }, + 'cylc_version': CYLC_VERSION, + } # generate response try: - response = method(**args) + data = method(**args) except Exception as exc: # includes incorrect arguments (TypeError) - LOG.exception(exc) # note the error server side - import traceback - return {'error': { - 'message': str(exc), 'traceback': traceback.format_exc()}} + LOG.exception(exc) # log the error server side + return { + 'error': {'message': str(exc)}, + 'cylc_version': CYLC_VERSION, + } - return {'data': response} + return { + 'data': data, + 'cylc_version': CYLC_VERSION, + } def register_endpoints(self): """Register all exposed methods.""" @@ -357,7 +392,7 @@ def graphql( variables: Optional[Dict[str, Any]] = None, meta: Optional[Dict[str, Any]] = None ): - """Return the GraphQL schema execution result. + """Return the data field of the GraphQL schema execution result. Args: request_string: GraphQL request passed to Graphene. @@ -367,41 +402,24 @@ def graphql( Returns: object: Execution result, or a list with errors. """ - try: - executed: 'ExecutionResult' = schema.execute( - request_string, - variable_values=variables, - context_value={ - 'resolvers': self.resolvers, - 'meta': meta or {}, - }, - backend=CylcGraphQLBackend(), - middleware=list(instantiate_middleware(self.middleware)), - executor=AsyncioExecutor(), - validate=True, # validate schema (dev only? default is True) - return_promise=False, - ) - except Exception as exc: - return 'ERROR: GraphQL execution error \n%s' % exc + executed: 'ExecutionResult' = schema.execute( + request_string, + variable_values=variables, + context_value={ + 'resolvers': self.resolvers, + 'meta': meta or {}, + }, + backend=CylcGraphQLBackend(), + middleware=list(instantiate_middleware(self.middleware)), + executor=AsyncioExecutor(), + validate=True, # validate schema (dev only? default is True) + return_promise=False, + ) if executed.errors: - errors: List[Any] = [] for error in executed.errors: - LOG.error(error) - if hasattr(error, '__traceback__'): - import traceback - formatted_tb = traceback.format_exception( - type(error), error, error.__traceback__ - ) - LOG.error("".join(formatted_tb)) - errors.append({ - 'error': { - 'message': str(error), - 'traceback': formatted_tb - } - }) - continue - errors.append(getattr(error, 'message', None)) - return errors + LOG.warning(error) + if not executed.data: + raise Exception(executed.errors[0]) return executed.data # UIServer Data Commands diff --git a/cylc/flow/run_modes/dummy.py b/cylc/flow/run_modes/dummy.py index 26d887d87dc..a0c090b7386 100644 --- a/cylc/flow/run_modes/dummy.py +++ b/cylc/flow/run_modes/dummy.py @@ -18,22 +18,31 @@ Dummy mode shares settings with simulation mode. """ -from typing import TYPE_CHECKING, Any, Dict, Tuple +from typing import ( + TYPE_CHECKING, + Any, + Dict, + Tuple, +) +from cylc.flow.platforms import get_platform +from cylc.flow.run_modes import RunMode from cylc.flow.run_modes.simulation import ( ModeSettings, disable_platforms, get_simulated_run_len, - parse_fail_cycle_points + parse_fail_cycle_points, ) -from cylc.flow.run_modes import RunMode -from cylc.flow.platforms import get_platform if TYPE_CHECKING: + # BACK COMPAT: typing_extensions.Literal + # FROM: Python 3.7 + # TO: Python 3.8 + from typing_extensions import Literal + from cylc.flow.task_job_mgr import TaskJobManager from cylc.flow.task_proxy import TaskProxy - from typing_extensions import Literal CLEAR_THESE_SCRIPTS = [ diff --git a/cylc/flow/run_modes/simulation.py b/cylc/flow/run_modes/simulation.py index 900a2c1fc4f..8bbb8ccefa1 100644 --- a/cylc/flow/run_modes/simulation.py +++ b/cylc/flow/run_modes/simulation.py @@ -18,9 +18,15 @@ from dataclasses import dataclass from logging import INFO -from typing import ( - TYPE_CHECKING, Any, Dict, List, Tuple, Union) from time import time +from typing import ( + TYPE_CHECKING, + Any, + Dict, + List, + Tuple, + Union, +) from metomi.isodatetime.parsers import DurationParser @@ -29,22 +35,26 @@ from cylc.flow.cycling.loader import get_point from cylc.flow.exceptions import PointParsingError from cylc.flow.platforms import FORBIDDEN_WITH_PLATFORM +from cylc.flow.run_modes import RunMode from cylc.flow.task_outputs import TASK_OUTPUT_SUBMITTED from cylc.flow.task_state import ( - TASK_STATUS_RUNNING, TASK_STATUS_FAILED, + TASK_STATUS_RUNNING, TASK_STATUS_SUCCEEDED, ) from cylc.flow.wallclock import get_unix_time_from_time_string -from cylc.flow.run_modes import RunMode if TYPE_CHECKING: + # BACK COMPAT: typing_extensions.Literal + # FROM: Python 3.7 + # TO: Python 3.8 + from typing_extensions import Literal + from cylc.flow.task_events_mgr import TaskEventsManager from cylc.flow.task_job_mgr import TaskJobManager from cylc.flow.task_proxy import TaskProxy from cylc.flow.workflow_db_mgr import WorkflowDatabaseManager - from typing_extensions import Literal def submit_task_job( diff --git a/cylc/flow/run_modes/skip.py b/cylc/flow/run_modes/skip.py index 49736883911..4b0770bd159 100644 --- a/cylc/flow/run_modes/skip.py +++ b/cylc/flow/run_modes/skip.py @@ -17,23 +17,32 @@ """ from logging import INFO from typing import ( - TYPE_CHECKING, Dict, List, Tuple) + TYPE_CHECKING, + Dict, + List, + Tuple, +) from cylc.flow import LOG from cylc.flow.exceptions import WorkflowConfigError +from cylc.flow.run_modes import RunMode from cylc.flow.task_outputs import ( + TASK_OUTPUT_FAILED, + TASK_OUTPUT_STARTED, TASK_OUTPUT_SUBMITTED, TASK_OUTPUT_SUCCEEDED, - TASK_OUTPUT_FAILED, - TASK_OUTPUT_STARTED ) -from cylc.flow.run_modes import RunMode + if TYPE_CHECKING: - from cylc.flow.taskdef import TaskDef + # BACK COMPAT: typing_extensions.Literal + # FROM: Python 3.7 + # TO: Python 3.8 + from typing_extensions import Literal + from cylc.flow.task_job_mgr import TaskJobManager from cylc.flow.task_proxy import TaskProxy - from typing_extensions import Literal + from cylc.flow.taskdef import TaskDef def submit_task_job( diff --git a/cylc/flow/task_outputs.py b/cylc/flow/task_outputs.py index 8548ab405e4..11a3979c860 100644 --- a/cylc/flow/task_outputs.py +++ b/cylc/flow/task_outputs.py @@ -18,12 +18,12 @@ import ast import re from typing import ( + TYPE_CHECKING, Dict, Iterable, Iterator, List, Optional, - TYPE_CHECKING, Tuple, Union, ) @@ -35,10 +35,15 @@ restricted_evaluator, ) + if TYPE_CHECKING: - from cylc.flow.taskdef import TaskDef + # BACK COMPAT: typing_extensions.Literal + # FROM: Python 3.7 + # TO: Python 3.8 from typing_extensions import Literal + from cylc.flow.taskdef import TaskDef + # Standard task output strings, used for triggering. TASK_OUTPUT_EXPIRED = "expired" diff --git a/tests/integration/test_client.py b/tests/integration/test_client.py index 2195d3a112b..1ddef2b2f38 100644 --- a/tests/integration/test_client.py +++ b/tests/integration/test_client.py @@ -15,8 +15,11 @@ # along with this program. If not, see . """Test cylc.flow.client.WorkflowRuntimeClient.""" +import json +from unittest.mock import Mock import pytest +from cylc.flow.exceptions import ClientError from cylc.flow.network.client import WorkflowRuntimeClient from cylc.flow.network.server import PB_METHOD_MAP @@ -88,3 +91,35 @@ async def test_command_validation_failure(harness): 'response': [False, '--pre=all must be used alone'], } ] + + +@pytest.mark.parametrize( + 'sock_response, expected', + [ + pytest.param({'error': 'message'}, r"^message$", id="basic"), + pytest.param( + {'foo': 1}, + r"^Received invalid response for Cylc 8\.[\w.]+: \{'foo': 1[^}]*\}$", + id="no-err-field", + ), + pytest.param( + {'cylc_version': '8.x.y'}, + r"^Received invalid.+\n\(Workflow is running in Cylc 8.x.y\)$", + id="no-err-field-with-version", + ), + ], +) +async def test_async_request_err( + one, start, monkeypatch: pytest.MonkeyPatch, sock_response, expected +): + async def mock_recv(): + return json.dumps(sock_response).encode() + + async with start(one): + client = WorkflowRuntimeClient(one.workflow) + with monkeypatch.context() as mp: + mp.setattr(client, 'socket', Mock(recv=mock_recv)) + mp.setattr(client, 'poller', Mock()) + + with pytest.raises(ClientError, match=expected): + await client.async_request('graphql') diff --git a/tests/integration/test_replier.py b/tests/integration/test_replier.py index ce0b53fdaa8..74addb83113 100644 --- a/tests/integration/test_replier.py +++ b/tests/integration/test_replier.py @@ -14,28 +14,38 @@ # You should have received a copy of the GNU General Public License # along with this program. If not, see . -from async_timeout import timeout -from cylc.flow.network import decode_ -from cylc.flow.network.client import WorkflowRuntimeClient import asyncio +import getpass +from async_timeout import timeout import pytest +from cylc.flow import __version__ as CYLC_VERSION +from cylc.flow.network import load_server_response +from cylc.flow.network.client import WorkflowRuntimeClient +from cylc.flow.scheduler import Scheduler + -async def test_listener(one, start, ): +async def test_listener(one: Scheduler, start): """Test listener.""" async with start(one): + # Test listener handles an invalid message from client + # (without directly calling listener): client = WorkflowRuntimeClient(one.workflow) client.socket.send_string(r'Not JSON') - res = await client.socket.recv() - assert 'error' in decode_(res.decode()) + res = load_server_response( + (await client.socket.recv()).decode() + ) + assert res['error'] + assert 'data' not in res + # Check other fields are present: + assert res['cylc_version'] == CYLC_VERSION + assert res['user'] == getpass.getuser() one.server.replier.queue.put('STOP') async with timeout(2): # wait for the server to consume the STOP item from the queue - while True: - if one.server.replier.queue.empty(): - break + while not one.server.replier.queue.empty(): await asyncio.sleep(0.01) # ensure the server is "closed" one.server.replier.queue.put('foobar') diff --git a/tests/integration/test_server.py b/tests/integration/test_server.py index bc7103b8365..6d6e0c939ba 100644 --- a/tests/integration/test_server.py +++ b/tests/integration/test_server.py @@ -14,11 +14,13 @@ # You should have received a copy of the GNU General Public License # along with this program. If not, see . +import logging from typing import Callable from async_timeout import timeout from getpass import getuser import pytest +from cylc.flow import __version__ as CYLC_VERSION from cylc.flow.network.server import PB_METHOD_MAP from cylc.flow.scheduler import Scheduler @@ -89,35 +91,59 @@ async def test_stop(one: Scheduler, start): assert one.server.stopped -async def test_receiver(one: Scheduler, start): +async def test_receiver_basic(one: Scheduler, start, log_filter): """Test the receiver with different message objects.""" async with timeout(5): async with start(one): # start with a message that works - msg = {'command': 'api', 'user': '', 'args': {}} - assert 'error' not in one.server.receiver(msg) - assert 'data' in one.server.receiver(msg) - - # remove the user field - should error - msg2 = dict(msg) - msg2.pop('user') - assert 'error' in one.server.receiver(msg2) - - # remove the command field - should error - msg3 = dict(msg) - msg3.pop('command') - assert 'error' in one.server.receiver(msg3) - - # provide an invalid command - should error - msg4 = {**msg, 'command': 'foobar'} - assert 'error' in one.server.receiver(msg4) + msg = {'command': 'api', 'user': 'bono', 'args': {}} + res = one.server.receiver(msg) + assert not res.get('error') + assert res['data'] + assert res['cylc_version'] == CYLC_VERSION # simulate a command failure with the original message # (the one which worked earlier) - should error def _api(*args, **kwargs): - raise Exception('foo') + raise Exception('oopsie') one.server.api = _api - assert 'error' in one.server.receiver(msg) + res = one.server.receiver(msg) + assert res == { + 'error': {'message': 'oopsie'}, + 'cylc_version': CYLC_VERSION, + } + assert log_filter(logging.ERROR, 'oopsie') + + +@pytest.mark.parametrize( + 'msg, expected', + [ + pytest.param( + {'command': 'api', 'args': {}}, + f"Request missing field 'user' required for Cylc {CYLC_VERSION}", + id='missing-user', + ), + pytest.param( + {'user': 'bono', 'args': {}}, + f"Request missing field 'command' required for Cylc {CYLC_VERSION}", + id='missing-command', + ), + pytest.param( + {'command': 'foobar', 'user': 'bono', 'args': {}}, + f"No method by the name 'foobar' at Cylc {CYLC_VERSION}", + id='bad-command', + ), + ], +) +async def test_receiver_bad_requests(one: Scheduler, start, msg, expected): + """Test the receiver with different bad requests.""" + async with timeout(5): + async with start(one): + res = one.server.receiver(msg) + assert res == { + 'error': {'message': expected}, + 'cylc_version': CYLC_VERSION, + } async def test_publish_before_shutdown( From ff805119d22fdab25cc7efe252b0c03db737a5de Mon Sep 17 00:00:00 2001 From: Ronnie Dutta <61982285+MetRonnie@users.noreply.github.com> Date: Wed, 29 Jan 2025 16:23:54 +0000 Subject: [PATCH 2/7] Changelog --- changes.d/6578.fix.md | 1 + 1 file changed, 1 insertion(+) create mode 100644 changes.d/6578.fix.md diff --git a/changes.d/6578.fix.md b/changes.d/6578.fix.md new file mode 100644 index 00000000000..3732f3ad763 --- /dev/null +++ b/changes.d/6578.fix.md @@ -0,0 +1 @@ +Improved handling of any internal errors when executing commands against a running workflow. From cc15342b5941aff5f3361eda893150dfdb09263a Mon Sep 17 00:00:00 2001 From: Ronnie Dutta <61982285+MetRonnie@users.noreply.github.com> Date: Tue, 11 Feb 2025 15:30:19 +0000 Subject: [PATCH 3/7] Print better message for `cylc trigger` incompatibility between 8.3-8.4 --- cylc/flow/network/multi.py | 16 ++++++++++++++++ 1 file changed, 16 insertions(+) diff --git a/cylc/flow/network/multi.py b/cylc/flow/network/multi.py index a61c6d05bd2..4d246c8eb02 100644 --- a/cylc/flow/network/multi.py +++ b/cylc/flow/network/multi.py @@ -28,6 +28,7 @@ from ansimarkup import ansiprint +from cylc.flow import __version__ as CYLC_VERSION from cylc.flow.async_util import unordered_map from cylc.flow.exceptions import ( CylcError, @@ -38,6 +39,13 @@ from cylc.flow.terminal import DIM +# Known error messages for incompatibilites between this version of Cylc (that +# is running the command) and the version of Cylc running the workflow: +KNOWN_INCOMPAT = { + 'Unknown argument "onResume" on field "trigger" of type "Mutations".', +} + + def call_multi(*args, **kwargs): """Call a function for each workflow in a list of IDs. @@ -251,6 +259,14 @@ def _report( if isinstance(response, list) and response[0].get('error'): # If operating on workflow running in older Cylc version, # may get a error response like [{'error': '...'}] + if response[0]['error'].get('message') in KNOWN_INCOMPAT: + raise Exception( + "This command is no longer compatible with the " + "version of Cylc running the workflow. Please stop & " + f"restart the workflow with Cylc {CYLC_VERSION} " + "or higher." + f"\n\n{response}" + ) raise Exception(response) raise Exception(f"Unexpected response: {response}") for mutation_response in response.values(): From 8529d37f60eba87052f3bd4ef2e0aad2fccafcdb Mon Sep 17 00:00:00 2001 From: Ronnie Dutta <61982285+MetRonnie@users.noreply.github.com> Date: Tue, 11 Feb 2025 15:48:58 +0000 Subject: [PATCH 4/7] Move network module tests into own dir Delete redundant tests --- tests/integration/graphql/test_root.py | 79 ------------------- tests/integration/network/__init__.py | 0 tests/integration/{ => network}/key_setup.py | 0 .../integration/{ => network}/test_client.py | 0 .../integration/{ => network}/test_graphql.py | 0 .../{ => network}/test_publisher.py | 0 .../integration/{ => network}/test_replier.py | 0 .../{ => network}/test_resolvers.py | 0 tests/integration/{ => network}/test_scan.py | 0 .../integration/{ => network}/test_server.py | 0 tests/integration/{ => network}/test_zmq.py | 0 tests/integration/test_install.py | 2 +- 12 files changed, 1 insertion(+), 80 deletions(-) delete mode 100644 tests/integration/graphql/test_root.py create mode 100644 tests/integration/network/__init__.py rename tests/integration/{ => network}/key_setup.py (100%) rename tests/integration/{ => network}/test_client.py (100%) rename tests/integration/{ => network}/test_graphql.py (100%) rename tests/integration/{ => network}/test_publisher.py (100%) rename tests/integration/{ => network}/test_replier.py (100%) rename tests/integration/{ => network}/test_resolvers.py (100%) rename tests/integration/{ => network}/test_scan.py (100%) rename tests/integration/{ => network}/test_server.py (100%) rename tests/integration/{ => network}/test_zmq.py (100%) diff --git a/tests/integration/graphql/test_root.py b/tests/integration/graphql/test_root.py deleted file mode 100644 index 27290316cf7..00000000000 --- a/tests/integration/graphql/test_root.py +++ /dev/null @@ -1,79 +0,0 @@ -# THIS FILE IS PART OF THE CYLC WORKFLOW ENGINE. -# Copyright (C) NIWA & British Crown (Met Office) & Contributors. -# -# This program is free software: you can redistribute it and/or modify -# it under the terms of the GNU General Public License as published by -# the Free Software Foundation, either version 3 of the License, or -# (at your option) any later version. -# -# This program is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU General Public License for more details. -# -# You should have received a copy of the GNU General Public License -# along with this program. If not, see . - -import pytest - -from cylc.flow.network.client import WorkflowRuntimeClient - - -@pytest.fixture(scope='module') -async def harness(mod_flow, mod_scheduler, mod_run): - id_ = mod_flow({ - 'scheduling': { - 'graph': { - 'R1': ''' - a - b - ''', - }, - }, - 'runtime': { - 'A1': { - 'inherit': 'A2' - }, - 'A2': { - }, - 'a': { - 'inherit': 'A1' - }, - 'b': {}, - }, - }) - schd = mod_scheduler(id_) - async with mod_run(schd): - client = WorkflowRuntimeClient(id_) - - async def _query(query_string): - nonlocal client - return await client.async_request( - 'graphql', - { - 'request_string': 'query { %s } ' % query_string, - } - ) - yield schd, client, _query - - -async def test_workflows(harness): - """It should return True if running.""" - schd, client, query = harness - ret = await query('workflows(ids: ["%s"]) { id }' % schd.workflow) - assert ret == { - 'workflows': [ - {'id': f'~{schd.owner}/{schd.workflow}'} - ] - } - - -async def test_jobs(harness): - """It should return True if running.""" - schd, client, query = harness - ret = await query('workflows(ids: ["%s"]) { id }' % schd.workflow) - assert ret == { - 'workflows': [ - {'id': f'~{schd.owner}/{schd.workflow}'} - ] - } diff --git a/tests/integration/network/__init__.py b/tests/integration/network/__init__.py new file mode 100644 index 00000000000..e69de29bb2d diff --git a/tests/integration/key_setup.py b/tests/integration/network/key_setup.py similarity index 100% rename from tests/integration/key_setup.py rename to tests/integration/network/key_setup.py diff --git a/tests/integration/test_client.py b/tests/integration/network/test_client.py similarity index 100% rename from tests/integration/test_client.py rename to tests/integration/network/test_client.py diff --git a/tests/integration/test_graphql.py b/tests/integration/network/test_graphql.py similarity index 100% rename from tests/integration/test_graphql.py rename to tests/integration/network/test_graphql.py diff --git a/tests/integration/test_publisher.py b/tests/integration/network/test_publisher.py similarity index 100% rename from tests/integration/test_publisher.py rename to tests/integration/network/test_publisher.py diff --git a/tests/integration/test_replier.py b/tests/integration/network/test_replier.py similarity index 100% rename from tests/integration/test_replier.py rename to tests/integration/network/test_replier.py diff --git a/tests/integration/test_resolvers.py b/tests/integration/network/test_resolvers.py similarity index 100% rename from tests/integration/test_resolvers.py rename to tests/integration/network/test_resolvers.py diff --git a/tests/integration/test_scan.py b/tests/integration/network/test_scan.py similarity index 100% rename from tests/integration/test_scan.py rename to tests/integration/network/test_scan.py diff --git a/tests/integration/test_server.py b/tests/integration/network/test_server.py similarity index 100% rename from tests/integration/test_server.py rename to tests/integration/network/test_server.py diff --git a/tests/integration/test_zmq.py b/tests/integration/network/test_zmq.py similarity index 100% rename from tests/integration/test_zmq.py rename to tests/integration/network/test_zmq.py diff --git a/tests/integration/test_install.py b/tests/integration/test_install.py index 5a9298c4a8f..3322ccb268a 100644 --- a/tests/integration/test_install.py +++ b/tests/integration/test_install.py @@ -28,7 +28,7 @@ install_cli ) -from .test_scan import init_flows +from .network.test_scan import init_flows from .utils.entry_points import EntryPointWrapper SRV_DIR = Path(WorkflowFiles.Service.DIRNAME) From 43cab1701ad4ac1f8b862d0eac0df0e1ce5d536a Mon Sep 17 00:00:00 2001 From: Ronnie Dutta <61982285+MetRonnie@users.noreply.github.com> Date: Fri, 21 Feb 2025 12:58:37 +0000 Subject: [PATCH 5/7] Abstract out the network transport format again --- cylc/flow/network/__init__.py | 14 +++++++++++--- cylc/flow/network/client.py | 12 ++++++------ cylc/flow/network/replier.py | 10 +++++----- tests/integration/network/test_replier.py | 4 ++-- 4 files changed, 24 insertions(+), 16 deletions(-) diff --git a/cylc/flow/network/__init__.py b/cylc/flow/network/__init__.py index 882fe9b99d1..92318f7f2d9 100644 --- a/cylc/flow/network/__init__.py +++ b/cylc/flow/network/__init__.py @@ -95,11 +95,19 @@ class ResponseDict(TypedDict, total=False): back-compat issues.""" -def load_server_response(message: str) -> 'ResponseDict': +def serialize(data: object) -> str: + """Convert the structure holding a message to a JSON message string.""" + # Abstract out the transport format in order to allow it to be changed + # in future. + return json.dumps(data) + + +def deserialize(message: str) -> 'ResponseDict': """Convert a JSON message string to dict with an added 'user' field.""" + # Abstract out the transport format in order to allow it to be changed + # in future. msg = json.loads(message) - if 'user' not in msg: - msg['user'] = getpass.getuser() # assume this is the user + msg['user'] = getpass.getuser() # assume this is the user return msg diff --git a/cylc/flow/network/client.py b/cylc/flow/network/client.py index 20a8afb0552..656aefd5c74 100644 --- a/cylc/flow/network/client.py +++ b/cylc/flow/network/client.py @@ -20,7 +20,6 @@ abstractmethod, ) import asyncio -import json import os from shutil import which import socket @@ -51,7 +50,8 @@ from cylc.flow.network import ( ZMQSocketBase, get_location, - load_server_response, + deserialize, + serialize, ) from cylc.flow.network.client_factory import CommsMeth from cylc.flow.network.server import PB_METHOD_MAP @@ -284,7 +284,7 @@ async def async_request( args: Optional[Dict[str, Any]] = None, timeout: Optional[float] = None, req_meta: Optional[Dict[str, Any]] = None - ) -> Union[bytes, object]: + ) -> object: """Send an asynchronous request using asyncio. Has the same arguments and return values as ``serial_request``. @@ -306,7 +306,7 @@ async def async_request( if req_meta: msg['meta'].update(req_meta) LOG.debug('zmq:send %s', msg) - message = json.dumps(msg) + message = serialize(msg) self.socket.send_string(message) # receive response @@ -326,7 +326,7 @@ async def async_request( if command in PB_METHOD_MAP: return res - response: ResponseDict = load_server_response(res.decode()) + response: ResponseDict = deserialize(res.decode()) try: return response['data'] @@ -338,7 +338,7 @@ async def async_request( f"{response}" ) wflow_cylc_ver = response.get('cylc_version') - if wflow_cylc_ver: + if wflow_cylc_ver and wflow_cylc_ver != CYLC_VERSION: error += ( f"\n(Workflow is running in Cylc {wflow_cylc_ver})" ) diff --git a/cylc/flow/network/replier.py b/cylc/flow/network/replier.py index 98555a1166f..7512e066014 100644 --- a/cylc/flow/network/replier.py +++ b/cylc/flow/network/replier.py @@ -15,7 +15,6 @@ # along with this program. If not, see . """Server for workflow runtime API.""" -import json from queue import Queue from typing import ( TYPE_CHECKING, @@ -30,7 +29,8 @@ ) from cylc.flow.network import ( ZMQSocketBase, - load_server_response, + deserialize, + serialize, ) @@ -116,7 +116,7 @@ def listener(self) -> None: res: ResponseDict response: bytes try: - message = load_server_response(msg) + message = deserialize(msg) except Exception as exc: # purposefully catch generic exception # failed to decode message, possibly resulting from failed # authentication @@ -126,7 +126,7 @@ def listener(self) -> None: 'error': {'message': str(exc)}, 'cylc_version': CYLC_VERSION, } - response = json.dumps(res).encode() + response = serialize(res).encode() else: # success case - serve the request res = self.server.receiver(message) @@ -135,5 +135,5 @@ def listener(self) -> None: if isinstance(data, bytes): response = data else: - response = json.dumps(res).encode() + response = serialize(res).encode() self.socket.send(response) # type: ignore[union-attr] diff --git a/tests/integration/network/test_replier.py b/tests/integration/network/test_replier.py index 74addb83113..94ea25de020 100644 --- a/tests/integration/network/test_replier.py +++ b/tests/integration/network/test_replier.py @@ -21,7 +21,7 @@ import pytest from cylc.flow import __version__ as CYLC_VERSION -from cylc.flow.network import load_server_response +from cylc.flow.network import deserialize from cylc.flow.network.client import WorkflowRuntimeClient from cylc.flow.scheduler import Scheduler @@ -33,7 +33,7 @@ async def test_listener(one: Scheduler, start): # (without directly calling listener): client = WorkflowRuntimeClient(one.workflow) client.socket.send_string(r'Not JSON') - res = load_server_response( + res = deserialize( (await client.socket.recv()).decode() ) assert res['error'] From 1415531820ea8b3bb13ff7c277973de4b3887059 Mon Sep 17 00:00:00 2001 From: Ronnie Dutta <61982285+MetRonnie@users.noreply.github.com> Date: Mon, 24 Feb 2025 11:42:49 +0000 Subject: [PATCH 6/7] Return all graphql errors to client --- cylc/flow/network/server.py | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/cylc/flow/network/server.py b/cylc/flow/network/server.py index 9a44d46921e..56b2d911259 100644 --- a/cylc/flow/network/server.py +++ b/cylc/flow/network/server.py @@ -417,9 +417,10 @@ def graphql( ) if executed.errors: for error in executed.errors: - LOG.warning(error) - if not executed.data: - raise Exception(executed.errors[0]) + LOG.warning(f"GraphQL: {error}") + # If there are execution errors, it means there was an unexpected + # error, so fail the command. + raise Exception(*executed.errors) return executed.data # UIServer Data Commands From d6a21f5d027157701fd686e9435701dc9f8b86f2 Mon Sep 17 00:00:00 2001 From: Ronnie Dutta <61982285+MetRonnie@users.noreply.github.com> Date: Tue, 25 Feb 2025 12:01:10 +0000 Subject: [PATCH 7/7] Tidy workflow runtime client error handling --- cylc/flow/exceptions.py | 22 +++++++++++++++++++++- cylc/flow/network/client.py | 15 +++++++-------- cylc/flow/network/resolvers.py | 2 +- cylc/flow/network/scan.py | 10 +++++----- tests/integration/conftest.py | 1 - 5 files changed, 34 insertions(+), 16 deletions(-) diff --git a/cylc/flow/exceptions.py b/cylc/flow/exceptions.py index 802cfaaa9cd..c8ca1836999 100644 --- a/cylc/flow/exceptions.py +++ b/cylc/flow/exceptions.py @@ -18,16 +18,18 @@ from textwrap import wrap from typing import ( + TYPE_CHECKING, Dict, Optional, Sequence, Set, Union, - TYPE_CHECKING, ) +from cylc.flow import __version__ as CYLC_VERSION from cylc.flow.util import format_cmd + if TYPE_CHECKING: from cylc.flow.subprocctx import SubFuncContext @@ -285,6 +287,24 @@ def __str__(self) -> str: return ret +class RequestError(ClientError): + """Represents an error handling a request, returned by the server.""" + + def __init__( + self, message: str, workflow_cylc_version: Optional[str] = None + ): + ClientError.__init__( + self, + message, + traceback=( + f"(Workflow is running in Cylc {workflow_cylc_version})" + if workflow_cylc_version + and workflow_cylc_version != CYLC_VERSION + else None + ), + ) + + class WorkflowStopped(ClientError): """The Cylc scheduler you attempted to connect to is stopped.""" diff --git a/cylc/flow/network/client.py b/cylc/flow/network/client.py index 656aefd5c74..27798c58741 100644 --- a/cylc/flow/network/client.py +++ b/cylc/flow/network/client.py @@ -40,17 +40,17 @@ __version__ as CYLC_VERSION, ) from cylc.flow.exceptions import ( - ClientError, ClientTimeout, ContactFileExists, CylcError, + RequestError, WorkflowStopped, ) from cylc.flow.hostuserutil import get_fqdn_by_host from cylc.flow.network import ( ZMQSocketBase, - get_location, deserialize, + get_location, serialize, ) from cylc.flow.network.client_factory import CommsMeth @@ -332,17 +332,16 @@ async def async_request( return response['data'] except KeyError: error = response.get('error') + if isinstance(error, dict): + error = error.get('message', error) if not error: error = ( f"Received invalid response for Cylc {CYLC_VERSION}: " f"{response}" ) - wflow_cylc_ver = response.get('cylc_version') - if wflow_cylc_ver and wflow_cylc_ver != CYLC_VERSION: - error += ( - f"\n(Workflow is running in Cylc {wflow_cylc_ver})" - ) - raise ClientError(str(error)) from None + raise RequestError( + str(error), response.get('cylc_version') + ) from None def get_header(self) -> dict: """Return "header" data to attach to each request for traceability. diff --git a/cylc/flow/network/resolvers.py b/cylc/flow/network/resolvers.py index 79b91f97ee7..48ef39795ec 100644 --- a/cylc/flow/network/resolvers.py +++ b/cylc/flow/network/resolvers.py @@ -804,7 +804,7 @@ def broadcast( if mode == 'expire_broadcast': return self.schd.task_events_mgr.broadcast_mgr.expire_broadcast( cutoff) - raise ValueError('Unsupported broadcast mode') + raise ValueError(f"Unsupported broadcast mode: '{mode}'") def put_ext_trigger( self, diff --git a/cylc/flow/network/scan.py b/cylc/flow/network/scan.py index 6de28ada517..197d2579e97 100644 --- a/cylc/flow/network/scan.py +++ b/cylc/flow/network/scan.py @@ -59,21 +59,21 @@ cast, ) -from packaging.version import parse as parse_version from packaging.specifiers import SpecifierSet +from packaging.version import parse as parse_version from cylc.flow import LOG from cylc.flow.async_util import ( pipe, - scandir + scandir, ) from cylc.flow.cfgspec.glbl_cfg import glbl_cfg -from cylc.flow.exceptions import WorkflowStopped -from cylc.flow.network.client import ( +from cylc.flow.exceptions import ( ClientError, ClientTimeout, - WorkflowRuntimeClient, + WorkflowStopped, ) +from cylc.flow.network.client import WorkflowRuntimeClient from cylc.flow.pathutil import ( get_cylc_run_dir, get_workflow_run_dir, diff --git a/tests/integration/conftest.py b/tests/integration/conftest.py index cfcd137b453..5971bce6bb7 100644 --- a/tests/integration/conftest.py +++ b/tests/integration/conftest.py @@ -55,7 +55,6 @@ from cylc.flow.wallclock import get_current_time_string from cylc.flow.workflow_files import infer_latest_run_from_id from cylc.flow.workflow_status import StopMode -from cylc.flow.task_state import TASK_STATUS_SUBMITTED from .utils import _rm_if_empty from .utils.flow_tools import (