diff --git a/pilot/eventservice/communicationmanager/communicationmanager.py b/pilot/eventservice/communicationmanager/communicationmanager.py index 2b121009..863855e7 100644 --- a/pilot/eventservice/communicationmanager/communicationmanager.py +++ b/pilot/eventservice/communicationmanager/communicationmanager.py @@ -20,34 +20,31 @@ # - Wen Guan, wen.guan@cern.ch, 2018 # - Paul Nilsson, paul.nilsson@cern.ch, 2023 -""" -Main classes to manage the messages between ES and harvester/ACT/Panda. -""" +"""Main classes to manage the messages between ES and harvester/ACT/Panda.""" import json import logging import os import threading import time -try: - import Queue as queue # noqa: N813 -except Exception: - import queue # Python 3 +import queue +from typing import Any from pilot.common import exception from pilot.common.pluginfactory import PluginFactory - logger = logging.getLogger(__name__) -""" -Communication response -""" +class CommunicationResponse(): + """Communication response class.""" + def __init__(self, attrs: dict = None): + """ + Initialize variables. -class CommunicationResponse(object): - def __init__(self, attrs=None): + :param attrs: attributes dictionary (dict). + """ if not attrs: attrs = {} if not isinstance(attrs, dict): @@ -62,7 +59,12 @@ def __init__(self, attrs=None): for key in attrs: setattr(self, key, attrs[key]) - def __str__(self): + def __str__(self) -> str: + """ + Return string representation. + + :return: string representation (str). + """ json_str = {} for key, value in list(self.__dict__.items()): # Python 2/3 if value and type(value) is list: @@ -76,19 +78,23 @@ def __str__(self): return json.dumps(json_str) -""" -Communication request -""" +class CommunicationRequest(): + """Communication request class.""" + class RequestType(): + """Request type class.""" -class CommunicationRequest(object): - class RequestType(object): RequestJobs = 'request_jobs' UpdateJobs = 'update_jobs' RequestEvents = 'request_events' UpdateEvents = 'update_events' - def __init__(self, attrs=None): + def __init__(self, attrs: dict = None): + """ + Initialize variables. + + :param attrs: attributes dictionary (dict). + """ if not attrs: attrs = {} if not isinstance(attrs, dict): @@ -112,6 +118,11 @@ def __init__(self, attrs=None): self.abort = False def __str__(self): + """ + Return string representation. + + :return: string representation (str). + """ json_str = {} for key, value in list(self.__dict__.items()): # Python 2/3 if value and type(value) is list: @@ -122,17 +133,20 @@ def __str__(self): json_str[key] = str(value) else: json_str[key] = value - return json.dumps(json_str) - -""" -Communication manager thread -""" + return json.dumps(json_str) class CommunicationManager(threading.Thread, PluginFactory): + """Communication manager class.""" def __init__(self, *args, **kwargs): + """ + Initialize variables. + + :param args: args object (Any) + :param kwargs: kwargs dictionary (dict). + """ super(CommunicationManager, self).__init__() PluginFactory.__init__(self, *args, **kwargs) self.setName("CommunicationManager") @@ -159,29 +173,28 @@ def __init__(self, *args, **kwargs): self.kwargs = kwargs def stop(self): - """ - Set stop signal(main run process will clean queued requests to release waiting clients and then quit) - """ + """Set stop signal (main run process will clean queued requests to release waiting clients and then quit).""" if not self.is_stop(): - logger.info("Stopping Communication Manager.") + logger.info("stopping Communication Manager.") self.stop_event.set() - def is_stop(self): + def is_stop(self) -> bool: """ - check whether the stop signal is set + Check whether the stop signal is set. - :returns: True if the stop signal is set, otherwise False + :returns: True if the stop signal is set, otherwise False (bool) """ return self.stop_event.is_set() - def get_jobs(self, njobs=1, post_hook=None, args=None): + def get_jobs(self, njobs: int = 1, post_hook: Any = None, args: Any = None) -> Any: """ + Get jobs. + Function can be called by client to send a get_job request and get a response with jobs. - :returns: jobs(got from jobs servers) - :raise: Exception catched when getting jobs + :raises: Exception caught when getting jobs from server + :return: jobs (from server) (Any). """ - if self.is_stop(): return None @@ -213,14 +226,17 @@ def get_jobs(self, njobs=1, post_hook=None, args=None): else: return req.response.content - def update_jobs(self, jobs, post_hook=None): + def update_jobs(self, jobs: Any, post_hook: Any = None) -> Any: """ + Update jobs. + Function can be called by client to update jobs' status to server. - :returns: status of updating jobs - :raise: Exception catched when updating jobs + :param jobs: jobs to be updated (Any) + :param post_hook: post hook function (Any) + :raises: Exception caught when updating jobs + :return: status of updating jobs (Any). """ - if self.is_stop(): return None @@ -243,14 +259,18 @@ def update_jobs(self, jobs, post_hook=None): else: return req.response.content - def get_event_ranges(self, num_event_ranges=1, post_hook=None, job=None): + def get_event_ranges(self, num_event_ranges: int = 1, post_hook: Any = None, job: Any = None) -> Any: """ + Get event ranges. + Function can be called by client to send a get_event_ranges request and get a response with event ranges. - :returns: event ranges (got from jobs servers) + :param num_event_ranges: number of event ranges to get (int) + :param post_hook: post hook function (Any) + :param job: job info (Any) :raise: Exception caught when getting event ranges + :return: event ranges (from server) (Any). """ - if self.is_stop(): return None @@ -284,14 +304,17 @@ def get_event_ranges(self, num_event_ranges=1, post_hook=None, job=None): else: return req.response.content - def update_events(self, update_events, post_hook=None): + def update_events(self, update_events: Any, post_hook: Any = None) -> Any: """ + Update events. + Function can be called by client to send a update_events request. - :returns: status of updating event ranges - :raise: Exception catched when updating event ranges + :param update_events: update events (Any) + :param post_hook: post hook function (Any) + :raises: Exception caught when updating event ranges + :return: status of updating event ranges """ - if self.is_stop(): return None @@ -313,13 +336,12 @@ def update_events(self, update_events, post_hook=None): else: return req.response.content - def get_plugin_confs(self): + def get_plugin_confs(self) -> dict: """ - Get different plugin for different communicator + Get different plug-in for different communicator. - :returns: dict with {'class': } and other items + :returns: dict with {'class': } and other items (dict). """ - plugin = os.environ.get('COMMUNICATOR_PLUGIN', None) if not plugin: plugin_confs = {'class': 'pilot.eventservice.communicationmanager.plugins.pandacommunicator.PandaCommunicator'} @@ -333,16 +355,20 @@ def get_plugin_confs(self): if self.args: for key, value in list(vars(self.args).items()): # Python 2/3 plugin_confs[key] = value + return plugin_confs - def can_process_request(self, processor, process_type): + def can_process_request(self, processor: dict, process_type: str) -> bool: """ - To check whether it is ready to process request in a type. - For request such as HarvesterShareFileCommunicator, it should check whether there are processing requests to avoid overwriting files. + Check whether it is ready to process request in a type. - :returns: True or False - """ + For request such as HarvesterShareFileCommunicator, it should check whether there are processing requests to + avoid overwriting files. + :param processor: processor dictionary (dict) + :param process_type: process type (str) + :return: True or False (bool). + """ if self.queues[process_type].empty(): return False @@ -356,14 +382,11 @@ def can_process_request(self, processor, process_type): return False def run(self): - """ - Main loop to handle communication requests - """ - + """Handle communication requests.""" confs = self.get_plugin_confs() - logger.info(f"Communication plugin confs: {confs}") + logger.info(f"communication plugin confs: {confs}") communicator = self.get_plugin(confs) - logger.info(f"Communication: {communicator}") + logger.info(f"communicator: {communicator}") processor = {'request_get_jobs': {'pre_check': communicator.pre_check_get_jobs, 'handler': communicator.request_get_jobs, @@ -408,14 +431,14 @@ def run(self): if not pre_check_resp.status == 0: continue - logger.info(f"Processing {process_type}") + logger.info(f"processing {process_type}") has_req = True req = self.queues[process_type].get() - logger.info(f"Processing {process_type} request: {req}") + logger.info(f"processing {process_type} request: {req}") res = processor[process_type]['handler'](req) - logger.info(f"Processing {process_type} respone: {res}") + logger.info(f"processing {process_type} respone: {res}") if res.status is False: req.response = res @@ -432,4 +455,5 @@ def run(self): if self.is_stop(): break time.sleep(1) - logger.info("Communication manager stopped.") + + logger.info("communication manager finished") diff --git a/pilot/eventservice/communicationmanager/plugins/__init__.py b/pilot/eventservice/communicationmanager/plugins/__init__.py index 02cf1dd8..afe6e4f7 100644 --- a/pilot/eventservice/communicationmanager/plugins/__init__.py +++ b/pilot/eventservice/communicationmanager/plugins/__init__.py @@ -19,3 +19,5 @@ # Authors: # - Wen Guan, wen.guan@cern.ch, 2017 # - Paul Nilsson, paul.nilsson@cern.ch, 2023 + +"""Default init.""" diff --git a/pilot/eventservice/communicationmanager/plugins/basecommunicator.py b/pilot/eventservice/communicationmanager/plugins/basecommunicator.py index e2fcdc82..0adaa067 100644 --- a/pilot/eventservice/communicationmanager/plugins/basecommunicator.py +++ b/pilot/eventservice/communicationmanager/plugins/basecommunicator.py @@ -20,96 +20,147 @@ # - Wen Guan, wen.guan@cern.ch, 2018 # - Paul Nilsson, paul.nilsson@cern.ch, 2020-23 +"""Base communicator.""" + import logging -logger = logging.getLogger(__name__) +from typing import Any -""" -Base communicator -""" +logger = logging.getLogger(__name__) class BaseCommunicator(object): + """Base communicator class.""" + _instance = None - def __new__(class_, *args, **kwargs): + def __new__(class_, *args: Any, **kwargs: dict) -> Any: + """ + Create new instance of class. + + :param args: args object (Any) + :param kwargs: kwargs dictionary (dict) + :return: new class instance (Any). + """ if not isinstance(class_._instance, class_): class_._instance = object.__new__(class_, *args, **kwargs) + return class_._instance - def __init__(self, *args, **kwargs): + def __init__(self, *args: Any, **kwargs: dict): + """ + Initialize variables. + + :param args: args object (Any) + :param kwargs: kwargs dictionary (dict) + """ super(BaseCommunicator, self).__init__() for key in kwargs: setattr(self, key, kwargs[key]) - def pre_check_get_jobs(self, req): + def pre_check_get_jobs(self, req: Any): """ - Precheck whether it's ok to send a requst to get jobs. + Check whether it's ok to send a request to get jobs. + + :param req: request (Any) + :raises: NotImplementedError. """ - #raise exception.NotImplementedError() raise NotImplementedError() - def request_get_jobs(self, req): + def request_get_jobs(self, req: Any): """ - Send a requst to get jobs. + Send a request to get jobs. + + :param req: request (Any) + :raises: NotImplementedError. """ raise NotImplementedError() - def check_get_jobs_status(self, req): + def check_get_jobs_status(self, req: Any): """ - Check whether jobs are prepared + Check whether jobs are prepared. + + :param req: request (Any) + :raises: NotImplementedError. """ raise NotImplementedError() - def get_jobs(self, req): + def get_jobs(self, req: Any): """ - Get the job + Get the jobs. + + :param req: request (Any) + :raises: NotImplementedError. """ raise NotImplementedError() - def update_jobs(self, req): + def update_jobs(self, req: Any): """ - Update jobs status. + Update job statuses. + + :param req: request (Any) + :raises: NotImplementedError. """ raise NotImplementedError() - def pre_check_get_events(self, req): + def pre_check_get_events(self, req: Any): """ - Precheck whether it's ok to send a request to get events. + Check whether it's ok to send a request to get events. + + :param req: request (Any) + :raises: NotImplementedError. """ raise NotImplementedError() - def request_get_events(self, req): + def request_get_events(self, req: Any): """ - Send a requst to get events. + Send a request to get events. + + :param req: request (Any) + :raises: NotImplementedError. """ raise NotImplementedError() - def check_get_events_status(self, req): + def check_get_events_status(self, req: Any): """ - Check whether events prepared + Check whether events prepared. + + :param req: request (Any) + :raises: NotImplementedError. """ raise NotImplementedError() - def get_events(self, req): + def get_events(self, req: Any): """ - Get events + Get events. + + :param req: request (Any) + :raises: NotImplementedError. """ raise NotImplementedError() - def pre_check_update_events(self, req): + def pre_check_update_events(self, req: Any): """ - Precheck whether it's ok to update events. + Check whether it's ok to update events. + + :param req: request (Any) + :raises: NotImplementedError. """ raise NotImplementedError() - def update_events(self, req): + def update_events(self, req: Any): """ Update events. + + :param req: request (Any) + :raises: NotImplementedError. """ raise NotImplementedError() - def pre_check_update_jobs(self, req): + def pre_check_update_jobs(self, req: Any): """ - Precheck whether it's ok to update event ranges. + Check whether it's ok to update event ranges. + + :param req: request (Any) + :raises: NotImplementedError. """ raise NotImplementedError() diff --git a/pilot/eventservice/communicationmanager/plugins/pandacommunicator.py b/pilot/eventservice/communicationmanager/plugins/pandacommunicator.py index 0feae9fb..c1c34697 100644 --- a/pilot/eventservice/communicationmanager/plugins/pandacommunicator.py +++ b/pilot/eventservice/communicationmanager/plugins/pandacommunicator.py @@ -20,10 +20,13 @@ # - Wen Guan, wen.guan@cern.ch, 2018 # - Paul Nilsson, paul.nilsson@cern.ch, 2020-23 -import json +"""PanDA communicator.""" + +import logging import threading import traceback from os import environ +from typing import Any from pilot.common import exception from pilot.util import https @@ -31,47 +34,59 @@ from ..communicationmanager import CommunicationResponse from .basecommunicator import BaseCommunicator -import logging logger = logging.getLogger(__name__) -""" -Panda Communicator -""" - class PandaCommunicator(BaseCommunicator): - def __init__(self, *args, **kwargs): + """PanDA communicator class.""" + + def __init__(self, *args: Any, **kwargs: dict): + """ + Initialize variables. + + :param args: args object (Any) + :param kwargs: kwargs dictionary (dict) + """ super(PandaCommunicator, self).__init__(args, kwargs) self.get_jobs_lock = threading.Lock() self.get_events_lock = threading.Lock() self.update_events_lock = threading.Lock() self.update_jobs_lock = threading.Lock() - def pre_check_get_jobs(self, req=None): + def pre_check_get_jobs(self, req=None) -> Any: """ - Precheck whether it's ok to send a requst to get jobs. + Check whether it's ok to send a request to get jobs. + + :param req: request (Any) + :return: CommunicationResponse({'status': 0}) (Any). """ return CommunicationResponse({'status': 0}) - def request_get_jobs(self, req): + def request_get_jobs(self, req: Any) -> Any: """ - Send a requst to get jobs. + Send a request to get jobs. + + :param req: request (Any) + :return: CommunicationResponse({'status': 0}) (Any). """ return CommunicationResponse({'status': 0}) def check_get_jobs_status(self, req=None): """ - Check whether jobs are prepared + Check whether jobs are prepared. + + :param req: request (Any) + :return: CommunicationResponse({'status': 0}) (Any). """ return CommunicationResponse({'status': 0}) - def get_jobs(self, req): + def get_jobs(self, req: Any) -> dict: """ Get the job definition from panda server. - :return: job definiton dictionary. + :param req: request (Any) + :return: job definition dictionary (dict). """ - self.get_jobs_lock.acquire() try: @@ -121,31 +136,42 @@ def get_jobs(self, req): return resp - def pre_check_get_events(self, req=None): + def pre_check_get_events(self, req: Any = None) -> Any: """ Precheck whether it's ok to send a request to get events. + + :param req: request (Any) + :return: CommunicationResponse({'status': 0}) (Any). """ return CommunicationResponse({'status': 0}) - def request_get_events(self, req): + def request_get_events(self, req: Any) -> Any: """ - Send a requst to get events. + Send a request to get events. + + :param req: request (Any) + :return: CommunicationResponse({'status': 0}) (Any). """ return CommunicationResponse({'status': 0}) - def check_get_events_status(self, req=None): + def check_get_events_status(self, req: Any = None) -> Any: """ - Check whether events prepared + Check whether events prepared. + + :param req: request (Any) + :return: CommunicationResponse({'status': 0}) (Any). """ return CommunicationResponse({'status': 0}) - def get_events(self, req): + def get_events(self, req: Any) -> Any: """ - Get events + Get events. + + :param req: request (Any) + :return: CommunicationResponse({'status': 0}) (Any). """ self.get_events_lock.acquire() - resp = None try: if not req.num_ranges: # ToBeFix num_ranges with corecount @@ -182,9 +208,12 @@ def get_events(self, req): return resp - def pre_check_update_events(self, req=None): + def pre_check_update_events(self, req: Any = None) -> Any: """ Precheck whether it's ok to update events. + + :param req: request (Any) + :return: CommunicationResponse({'status': 0}) (Any). """ self.update_events_lock.acquire() try: @@ -192,15 +221,18 @@ def pre_check_update_events(self, req=None): except Exception as e: # Python 2/3 logger.error(f"Failed to pre_check_update_events: {e}, {traceback.format_exc()}") self.update_events_lock.release() + return CommunicationResponse({'status': 0}) - def update_events(self, req): + def update_events(self, req: Any) -> Any: """ Update events. + + :param req: request (Any) + :return: CommunicationResponse({'status': 0}) (Any). """ self.update_events_lock.acquire() - resp = None try: logger.info(f"Updating events: {req}") url = environ.get('PANDA_SERVER_URL', config.Pilot.pandaserver) @@ -215,25 +247,31 @@ def update_events(self, req): resp = CommunicationResponse(resp_attrs) self.update_events_lock.release() + return resp - def pre_check_update_jobs(self, req=None): + def pre_check_update_jobs(self, req: Any = None) -> Any: """ - Precheck whether it's ok to update jobs. + Check whether it's ok to update jobs. + + :param req: request (Any) + :return: CommunicationResponse({'status': 0}) (Any). """ - self.update_jobs_lock.acquire() try: - pass - except Exception as e: # Python 2/3 - logger.error(f"Failed to pre_check_update_jobs: {e}, {traceback.format_exc()}") - self.update_jobs_lock.release() + self.update_jobs_lock.acquire() + + self.update_jobs_lock.release() + except Exception as exc: + logger.error(f"failed in pre_check_update_jobs: {exc}, {traceback.format_exc()}") return CommunicationResponse({'status': 0}) - def update_job(self, job): + def update_job(self, job: Any) -> int: """ Update job. - """ + :param job: job definition (Any) + :return: status code (int). + """ try: logger.info(f"Updating job: {job}") url = environ.get('PANDA_SERVER_URL', config.Pilot.pandaserver) @@ -241,17 +279,19 @@ def update_job(self, job): logger.info(f"Updated jobs status: {res}") return res - except Exception as e: # Python 2/3 - logger.error(f"Failed to update jobs: {e}, {traceback.format_exc()}") + except Exception as exc: + logger.error(f"failed to update jobs: {exc}, {traceback.format_exc()}") return -1 - def update_jobs(self, req): + def update_jobs(self, req: Any) -> Any: """ Update jobs. + + :param req: request (Any) + :return: CommunicationResponse({'status': 0}) (Any). """ self.update_jobs_lock.acquire() - resp = None try: logger.info(f"Updating jobs: {req}") res_list = [] @@ -266,27 +306,5 @@ def update_jobs(self, req): resp = CommunicationResponse(resp_attrs) self.update_jobs_lock.release() - return resp - def update_jobs_old(self, req): - """ - Update jobs. - """ - self.update_jobs_lock.acquire() - - try: - logger.info(f"Updating jobs: {req}") - data = {'jobList': json.dumps(req.jobs)} - url = environ.get('PANDA_SERVER_URL', config.Pilot.pandaserver) - res = https.request(f'{url}/server/panda/updateJobsInBulk', data=data) - - logger.info(f"Updated jobs status: {res}") - resp_attrs = {'status': 0, 'content': res, 'exception': None} - resp = CommunicationResponse(resp_attrs) - except Exception as e: # Python 2/3 - logger.error(f"Failed to update jobs: {e}, {traceback.format_exc()}") - resp_attrs = {'status': -1, 'content': None, 'exception': exception.UnknownException(f"Failed to update jobs: {traceback.format_exc()}")} - resp = CommunicationResponse(resp_attrs) - - self.update_jobs_lock.release() return resp