forked from PanDAWMS/pilot3
-
Notifications
You must be signed in to change notification settings - Fork 2
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
Paul Nilsson
committed
Nov 20, 2023
1 parent
12a11b0
commit 6cd646a
Showing
4 changed files
with
252 additions
and
157 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -20,34 +20,31 @@ | |
# - Wen Guan, [email protected], 2018 | ||
# - Paul Nilsson, [email protected], 2023 | ||
|
||
""" | ||
Main classes to manage the messages between ES and harvester/ACT/Panda. | ||
""" | ||
"""Main classes to manage the messages between ES and harvester/ACT/Panda.""" | ||
|
||
import json | ||
import logging | ||
import os | ||
import threading | ||
import time | ||
try: | ||
import Queue as queue # noqa: N813 | ||
except Exception: | ||
import queue # Python 3 | ||
import queue | ||
from typing import Any | ||
|
||
from pilot.common import exception | ||
from pilot.common.pluginfactory import PluginFactory | ||
|
||
|
||
logger = logging.getLogger(__name__) | ||
|
||
|
||
""" | ||
Communication response | ||
""" | ||
class CommunicationResponse(): | ||
"""Communication response class.""" | ||
|
||
def __init__(self, attrs: dict = None): | ||
""" | ||
Initialize variables. | ||
class CommunicationResponse(object): | ||
def __init__(self, attrs=None): | ||
:param attrs: attributes dictionary (dict). | ||
""" | ||
if not attrs: | ||
attrs = {} | ||
if not isinstance(attrs, dict): | ||
|
@@ -62,7 +59,12 @@ def __init__(self, attrs=None): | |
for key in attrs: | ||
setattr(self, key, attrs[key]) | ||
|
||
def __str__(self): | ||
def __str__(self) -> str: | ||
""" | ||
Return string representation. | ||
:return: string representation (str). | ||
""" | ||
json_str = {} | ||
for key, value in list(self.__dict__.items()): # Python 2/3 | ||
if value and type(value) is list: | ||
|
@@ -76,19 +78,23 @@ def __str__(self): | |
return json.dumps(json_str) | ||
|
||
|
||
""" | ||
Communication request | ||
""" | ||
class CommunicationRequest(): | ||
"""Communication request class.""" | ||
|
||
class RequestType(): | ||
"""Request type class.""" | ||
|
||
class CommunicationRequest(object): | ||
class RequestType(object): | ||
RequestJobs = 'request_jobs' | ||
UpdateJobs = 'update_jobs' | ||
RequestEvents = 'request_events' | ||
UpdateEvents = 'update_events' | ||
|
||
def __init__(self, attrs=None): | ||
def __init__(self, attrs: dict = None): | ||
""" | ||
Initialize variables. | ||
:param attrs: attributes dictionary (dict). | ||
""" | ||
if not attrs: | ||
attrs = {} | ||
if not isinstance(attrs, dict): | ||
|
@@ -112,6 +118,11 @@ def __init__(self, attrs=None): | |
self.abort = False | ||
|
||
def __str__(self): | ||
""" | ||
Return string representation. | ||
:return: string representation (str). | ||
""" | ||
json_str = {} | ||
for key, value in list(self.__dict__.items()): # Python 2/3 | ||
if value and type(value) is list: | ||
|
@@ -122,17 +133,20 @@ def __str__(self): | |
json_str[key] = str(value) | ||
else: | ||
json_str[key] = value | ||
return json.dumps(json_str) | ||
|
||
|
||
""" | ||
Communication manager thread | ||
""" | ||
return json.dumps(json_str) | ||
|
||
|
||
class CommunicationManager(threading.Thread, PluginFactory): | ||
"""Communication manager class.""" | ||
|
||
def __init__(self, *args, **kwargs): | ||
""" | ||
Initialize variables. | ||
:param args: args object (Any) | ||
:param kwargs: kwargs dictionary (dict). | ||
""" | ||
super(CommunicationManager, self).__init__() | ||
PluginFactory.__init__(self, *args, **kwargs) | ||
self.setName("CommunicationManager") | ||
|
@@ -159,29 +173,28 @@ def __init__(self, *args, **kwargs): | |
self.kwargs = kwargs | ||
|
||
def stop(self): | ||
""" | ||
Set stop signal(main run process will clean queued requests to release waiting clients and then quit) | ||
""" | ||
"""Set stop signal (main run process will clean queued requests to release waiting clients and then quit).""" | ||
if not self.is_stop(): | ||
logger.info("Stopping Communication Manager.") | ||
logger.info("stopping Communication Manager.") | ||
self.stop_event.set() | ||
|
||
def is_stop(self): | ||
def is_stop(self) -> bool: | ||
""" | ||
check whether the stop signal is set | ||
Check whether the stop signal is set. | ||
:returns: True if the stop signal is set, otherwise False | ||
:returns: True if the stop signal is set, otherwise False (bool) | ||
""" | ||
return self.stop_event.is_set() | ||
|
||
def get_jobs(self, njobs=1, post_hook=None, args=None): | ||
def get_jobs(self, njobs: int = 1, post_hook: Any = None, args: Any = None) -> Any: | ||
""" | ||
Get jobs. | ||
Function can be called by client to send a get_job request and get a response with jobs. | ||
:returns: jobs(got from jobs servers) | ||
:raise: Exception catched when getting jobs | ||
:raises: Exception caught when getting jobs from server | ||
:return: jobs (from server) (Any). | ||
""" | ||
|
||
if self.is_stop(): | ||
return None | ||
|
||
|
@@ -213,14 +226,17 @@ def get_jobs(self, njobs=1, post_hook=None, args=None): | |
else: | ||
return req.response.content | ||
|
||
def update_jobs(self, jobs, post_hook=None): | ||
def update_jobs(self, jobs: Any, post_hook: Any = None) -> Any: | ||
""" | ||
Update jobs. | ||
Function can be called by client to update jobs' status to server. | ||
:returns: status of updating jobs | ||
:raise: Exception catched when updating jobs | ||
:param jobs: jobs to be updated (Any) | ||
:param post_hook: post hook function (Any) | ||
:raises: Exception caught when updating jobs | ||
:return: status of updating jobs (Any). | ||
""" | ||
|
||
if self.is_stop(): | ||
return None | ||
|
||
|
@@ -243,14 +259,18 @@ def update_jobs(self, jobs, post_hook=None): | |
else: | ||
return req.response.content | ||
|
||
def get_event_ranges(self, num_event_ranges=1, post_hook=None, job=None): | ||
def get_event_ranges(self, num_event_ranges: int = 1, post_hook: Any = None, job: Any = None) -> Any: | ||
""" | ||
Get event ranges. | ||
Function can be called by client to send a get_event_ranges request and get a response with event ranges. | ||
:returns: event ranges (got from jobs servers) | ||
:param num_event_ranges: number of event ranges to get (int) | ||
:param post_hook: post hook function (Any) | ||
:param job: job info (Any) | ||
:raise: Exception caught when getting event ranges | ||
:return: event ranges (from server) (Any). | ||
""" | ||
|
||
if self.is_stop(): | ||
return None | ||
|
||
|
@@ -284,14 +304,17 @@ def get_event_ranges(self, num_event_ranges=1, post_hook=None, job=None): | |
else: | ||
return req.response.content | ||
|
||
def update_events(self, update_events, post_hook=None): | ||
def update_events(self, update_events: Any, post_hook: Any = None) -> Any: | ||
""" | ||
Update events. | ||
Function can be called by client to send a update_events request. | ||
:returns: status of updating event ranges | ||
:raise: Exception catched when updating event ranges | ||
:param update_events: update events (Any) | ||
:param post_hook: post hook function (Any) | ||
:raises: Exception caught when updating event ranges | ||
:return: status of updating event ranges | ||
""" | ||
|
||
if self.is_stop(): | ||
return None | ||
|
||
|
@@ -313,13 +336,12 @@ def update_events(self, update_events, post_hook=None): | |
else: | ||
return req.response.content | ||
|
||
def get_plugin_confs(self): | ||
def get_plugin_confs(self) -> dict: | ||
""" | ||
Get different plugin for different communicator | ||
Get different plug-in for different communicator. | ||
:returns: dict with {'class': <plugin_class>} and other items | ||
:returns: dict with {'class': <plugin_class>} and other items (dict). | ||
""" | ||
|
||
plugin = os.environ.get('COMMUNICATOR_PLUGIN', None) | ||
if not plugin: | ||
plugin_confs = {'class': 'pilot.eventservice.communicationmanager.plugins.pandacommunicator.PandaCommunicator'} | ||
|
@@ -333,16 +355,20 @@ def get_plugin_confs(self): | |
if self.args: | ||
for key, value in list(vars(self.args).items()): # Python 2/3 | ||
plugin_confs[key] = value | ||
|
||
return plugin_confs | ||
|
||
def can_process_request(self, processor, process_type): | ||
def can_process_request(self, processor: dict, process_type: str) -> bool: | ||
""" | ||
To check whether it is ready to process request in a type. | ||
For request such as HarvesterShareFileCommunicator, it should check whether there are processing requests to avoid overwriting files. | ||
Check whether it is ready to process request in a type. | ||
:returns: True or False | ||
""" | ||
For request such as HarvesterShareFileCommunicator, it should check whether there are processing requests to | ||
avoid overwriting files. | ||
:param processor: processor dictionary (dict) | ||
:param process_type: process type (str) | ||
:return: True or False (bool). | ||
""" | ||
if self.queues[process_type].empty(): | ||
return False | ||
|
||
|
@@ -356,14 +382,11 @@ def can_process_request(self, processor, process_type): | |
return False | ||
|
||
def run(self): | ||
""" | ||
Main loop to handle communication requests | ||
""" | ||
|
||
"""Handle communication requests.""" | ||
confs = self.get_plugin_confs() | ||
logger.info(f"Communication plugin confs: {confs}") | ||
logger.info(f"communication plugin confs: {confs}") | ||
communicator = self.get_plugin(confs) | ||
logger.info(f"Communication: {communicator}") | ||
logger.info(f"communicator: {communicator}") | ||
|
||
processor = {'request_get_jobs': {'pre_check': communicator.pre_check_get_jobs, | ||
'handler': communicator.request_get_jobs, | ||
|
@@ -408,14 +431,14 @@ def run(self): | |
if not pre_check_resp.status == 0: | ||
continue | ||
|
||
logger.info(f"Processing {process_type}") | ||
logger.info(f"processing {process_type}") | ||
|
||
has_req = True | ||
req = self.queues[process_type].get() | ||
|
||
logger.info(f"Processing {process_type} request: {req}") | ||
logger.info(f"processing {process_type} request: {req}") | ||
res = processor[process_type]['handler'](req) | ||
logger.info(f"Processing {process_type} respone: {res}") | ||
logger.info(f"processing {process_type} respone: {res}") | ||
|
||
if res.status is False: | ||
req.response = res | ||
|
@@ -432,4 +455,5 @@ def run(self): | |
if self.is_stop(): | ||
break | ||
time.sleep(1) | ||
logger.info("Communication manager stopped.") | ||
|
||
logger.info("communication manager finished") |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -19,3 +19,5 @@ | |
# Authors: | ||
# - Wen Guan, [email protected], 2017 | ||
# - Paul Nilsson, [email protected], 2023 | ||
|
||
"""Default init.""" |
Oops, something went wrong.