Skip to content

Commit

Permalink
Pydocstyle updates
Browse files Browse the repository at this point in the history
  • Loading branch information
Paul Nilsson committed Nov 20, 2023
1 parent 6cd646a commit b1aca9f
Show file tree
Hide file tree
Showing 13 changed files with 598 additions and 423 deletions.
2 changes: 1 addition & 1 deletion PILOTVERSION
Original file line number Diff line number Diff line change
@@ -1 +1 @@
3.7.1.14
3.7.1.15
29 changes: 15 additions & 14 deletions pilot/eventservice/esprocess/eshook.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,37 +20,38 @@
# - Wen Guan, [email protected], 2017
# - Paul Nilsson, [email protected], 2023

"""
Hooks for EventService.
"""
"""Hooks for EventService."""


class ESHook:
def get_payload(self):
"""Event Service Hook class."""

def get_payload(self) -> dict:
"""
Get payload to execute.
:returns: dict {'payload': <cmd string>, 'output_file': <filename or without it>, 'error_file': <filename or without it>}
:return: {'payload': <cmd string>, 'output_file': <filenamet>, 'error_file': <filename>} (dict).
"""
raise Exception("Not Implemented")

def get_event_ranges(self, num_ranges=1):
def get_event_ranges(self, num_ranges: int = 1) -> dict:
"""
Get event ranges.
:param num_ranges: Number of event ranges to download, default is 1.
:returns: dict of event ranges.
None if no available events.
:param num_ranges: Number of event ranges to download, default is 1 (int)
:returns: dictionary of event ranges (dict).
"""
raise Exception("Not Implemented")

def handle_out_message(self, message):
def handle_out_message(self, message: dict):
"""
Handle ES output or error message.
:param message: a dict of parsed message.
For 'finished' event ranges, it's {'id': <id>, 'status': 'finished', 'output': <output>, 'cpu': <cpu>,
'wall': <wall>, 'message': <full message>}.
Fro 'failed' event ranges, it's {'id': <id>, 'status': 'finished', 'message': <full message>}.
Example
For 'finished' event ranges, it's {'id': <id>, 'status': 'finished', 'output': <output>, 'cpu': <cpu>,
'wall': <wall>, 'message': <full message>}.
For 'failed' event ranges, it's {'id': <id>, 'status': 'finished', 'message': <full message>}.
:param message: dictionary of a parsed message (dict).
"""
raise Exception("Not Implemented")
21 changes: 10 additions & 11 deletions pilot/eventservice/esprocess/esmanager.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,24 +20,26 @@
# - Wen Guan, [email protected], 2017
# - Paul Nilsson, [email protected], 2023

"""Event Service manager to set up and run ESProcess."""

import logging
from typing import Any

from pilot.eventservice.esprocess.esprocess import ESProcess
from pilot.eventservice.esprocess.eshook import ESHook

logger = logging.getLogger(__name__)

"""
ES manager to setup and run ESProcess.
"""


class ESManager:
def __init__(self, hook):
"""Event Service manager class."""

def __init__(self, hook: Any):
"""
Initialization: setup ES hooks.
Set up ES hooks.
:param hook: an instance of ESHook.
:param hook: an instance of ESHook (Any)
:raises Exception: if hook is not an instance of ESHook.
"""
logger.info('initializing hooks')
if not isinstance(hook, ESHook):
Expand All @@ -47,10 +49,7 @@ def __init__(self, hook):
logger.info('initialized hooks')

def run(self):
"""
Initialize and run ESProcess.
"""

"""Initialize and run ESProcess."""
logger.debug('gettting payload')
payload = self.__hook.get_payload()
logger.debug(f'got payload: {payload}')
Expand Down
83 changes: 38 additions & 45 deletions pilot/eventservice/esprocess/esmessage.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,106 +19,99 @@
# - Wen Guan, [email protected], 2017
# - Paul Nilsson, [email protected], 2021-23

"""Event Service message class."""

import logging
import os
import threading
import time
import traceback
from typing import Any

from pilot.common.exception import PilotException, MessageFailure


logger = logging.getLogger(__name__)


class MessageThread(threading.Thread):
"""
A thread to receive messages from payload and put recevied messages to the out queues.
"""
"""A thread to receive messages from payload and put recevied messages to the out queues."""

def __init__(self, message_queue, socket_name=None, context='local', **kwds):
def __init__(self, message_queue: Any, socket_name: str = None, context: str = 'local', **kwds: dict):
"""
Initialize yampl server socket.
:param message_queue: a queue to transfer messages between current instance and ESProcess.
:param socket_name: name of the socket between current process and payload.
:param context: name of the context between current process and payload, default is 'local'.
:param **kwds: other parameters.
:raises MessageFailure: when failed to setup message socket.
:param message_queue: a queue to transfer messages between current instance and ESProcess (Any)
:param socket_name: name of the socket between current process and payload (str)
:param context: name of the context between current process and payload, default is 'local' (str)
:param **kwds: other parameters (dict)
:raises MessageFailure: when failed to set up message socket.
"""

threading.Thread.__init__(self, **kwds)
self.setName("MessageThread")
self.__message_queue = message_queue
self._socket_name = socket_name
self.__stop = threading.Event()

logger.info('try to import yampl')
try:
import yampl
except Exception as e:
raise MessageFailure(f"Failed to import yampl: {e}")
logger.info('finished to import yampl')
except Exception as exc:
raise MessageFailure(f"Failed to import yampl: {exc}")

logger.info('start to setup yampl server socket.')
logger.info('setup yampl server socket')
try:
if self._socket_name is None or len(self._socket_name) == 0:
self._socket_name = f'EventService_EventRanges_{os.getpid()}'
self.__message_server = yampl.ServerSocket(self._socket_name, context)
except Exception as exc:
raise MessageFailure(f"failed to setup yampl server socket: {exc} {traceback.print_exc()}")
logger.info(f'finished to setup yampl server socket(socket_name: {self._socket_name}, context:{context}).')
raise MessageFailure(f"failed to set up yampl server socket: {exc} {traceback.print_exc()}")
logger.info(f'finished setting up yampl server socket (socket_name: {self._socket_name}, context:{context}).')

def get_yampl_socket_name(self) -> str:
"""
Get yampl socket name.
def get_yampl_socket_name(self):
:return: yampl socket name (str).
"""
return self._socket_name

def send(self, message):
def send(self, message: str):
"""
Send messages to payload through yampl server socket.
:param message: String of the message.
:raises MessageFailure: When failed to send a message to the payload.
:param message: message (str).
:raises MessageFailure: when failed to send a message to the payload.
"""
logger.debug(f'Send a message to yampl: {message}')
logger.debug(f'will send message to yampl: {message}')
try:
if not self.__message_server:
raise MessageFailure("No message server.")
self.__message_server.send_raw(message.encode('utf8')) # Python 2 and 3
except Exception as e:
raise MessageFailure(e)
self.__message_server.send_raw(message.encode('utf8'))
except Exception as exc:
raise MessageFailure(exc)

def stop(self):
"""
Set stop event.
"""
"""Set stop event."""
logger.debug('set stop event')
self.__stop.set()

def is_stopped(self):
def is_stopped(self) -> bool:
"""
Get status whether stop event is set.
:returns: True if stop event is set, otherwise False.
:return: True if stop event is set, otherwise False (bool).
"""
return self.__stop.is_set()

def terminate(self):
"""
Terminate message server.
"""
"""Terminate message server."""
if self.__message_server:
logger.info("Terminating message server.")
logger.info("terminating message server.")
del self.__message_server
self.__message_server = None

def run(self):
"""
Main thread loop to poll messages from payload and
put received into message queue for other processes to fetch.
"""
logger.info('Message thread starts to run.')
"""Poll messages from payload and put received into message queue for other processes to fetch."""
logger.info('message thread starts to run')
try:
while True:
if self.is_stopped():
Expand All @@ -134,11 +127,11 @@ def run(self):
self.__message_queue.put(buf.decode('utf8')) # Python 2 and 3
except PilotException as exc:
self.terminate()
logger.error(f"Pilot Exception: Message thread got an exception, will finish: {exc.get_detail()}, {traceback.format_exc()}")
logger.error(f"Pilot Exception: message thread got an exception, will finish: {exc.get_detail()}, {traceback.format_exc()}")
# raise exc
except Exception as exc:
self.terminate()
logger.error(f"Message thread got an exception, will finish: {exc}")
logger.error(f"message thread got an exception, will finish: {exc}")
# raise MessageFailure(exc)
self.terminate()
logger.info('Message thread finished.')
logger.info('message thread finished.')
Loading

0 comments on commit b1aca9f

Please sign in to comment.