Skip to content

Commit

Permalink
Adjustments for job suspension
Browse files Browse the repository at this point in the history
  • Loading branch information
PalNilsson committed Jan 17, 2024
1 parent 901bfaa commit 75d3072
Show file tree
Hide file tree
Showing 5 changed files with 60 additions and 13 deletions.
2 changes: 1 addition & 1 deletion PILOTVERSION
Original file line number Diff line number Diff line change
@@ -1 +1 @@
3.7.1.28
3.7.1.30a
3 changes: 2 additions & 1 deletion pilot/control/monitor.py
Original file line number Diff line number Diff line change
Expand Up @@ -312,7 +312,8 @@ def run_checks(queues: Any, args: Any) -> None:
last_heartbeat = int(time.time()) - args.pilot_heartbeat
if last_heartbeat > config.Pilot.pilot_heartbeat:
logger.debug(f'pilot heartbeat file was last updated {last_heartbeat} s ago (time to update)')
detected_job_suspension = True if last_heartbeat > 10 * 60 else False

detected_job_suspension = True #if last_heartbeat > 10 * 60 else False
_time = time.time()
# if the pilot heartbeat file can be updated, update the args object
if update_pilot_heartbeat(_time, detected_job_suspension, last_heartbeat):
Expand Down
2 changes: 1 addition & 1 deletion pilot/util/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
RELEASE = '3' # released number should be fixed at 3 for Pilot 3
VERSION = '7' # version number is '1' for first release, '0' until then, increased for bigger updates
REVISION = '1' # revision number should be reset to '0' for every new version release, increased for small updates
BUILD = '28' # build number should be reset to '1' for every new development cycle
BUILD = '30b' # build number should be reset to '1' for every new development cycle

SUCCESS = 0
FAILURE = 1
Expand Down
31 changes: 26 additions & 5 deletions pilot/util/heartbeat.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ def update_pilot_heartbeat(update_time: float, detected_job_suspension: bool, ti
:return: True if successfully updated heartbeat file, False otherwise (bool).
"""
path = os.path.join(os.getenv('PILOT_HOME', os.getcwd()), config.Pilot.pilot_heartbeat_file)
dictionary = read_pilot_heartbeat()
dictionary = read_pilot_heartbeat(path)
if not dictionary: # redundancy
dictionary = {}

Expand Down Expand Up @@ -85,19 +85,19 @@ def update_pilot_heartbeat(update_time: float, detected_job_suspension: bool, ti
return True


def read_pilot_heartbeat() -> dict:
def read_pilot_heartbeat(path: str) -> dict:
"""
Read the pilot heartbeat file.
:param path: path to heartbeat file (str)
:return: dictionary with pilot heartbeat info (dict).
"""
filename = config.Pilot.pilot_heartbeat_file
dictionary = {}

with lock:
if os.path.exists(filename):
if os.path.exists(path):
try:
dictionary = read_json(filename)
dictionary = read_json(path)
except (PilotException, FileHandlingFailure, ConversionFailure) as exc:
logger.warning(f'failed to read heartbeat file: {exc}')

Expand All @@ -118,6 +118,27 @@ def get_last_update(name: str = 'pilot') -> int:
return 0


def time_since_suspension() -> int:
"""
Return the time since the pilot detected a job suspension.
If non-zero, reset the time since detection to zero.
:return: time since the pilot detected a job suspension (int).
"""
path = os.path.join(os.getenv('PILOT_HOME', os.getcwd()), config.Pilot.pilot_heartbeat_file)
dictionary = read_pilot_heartbeat(path)
if dictionary:
time_since_detection = dictionary.get('time_since_detection', 0)
if time_since_detection:
# reset the time since detection to zero
update_pilot_heartbeat(time.time(), False, 0)

return time_since_detection

return 0


def is_suspended(limit: int = 10 * 60) -> bool:
"""
Check if the pilot was suspended.
Expand Down
35 changes: 30 additions & 5 deletions pilot/util/loopingjob.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,17 +27,36 @@
from typing import Any

from pilot.common.errorcodes import ErrorCodes
from pilot.util.auxiliary import whoami, set_pilot_state, cut_output, locate_core_file
from pilot.util.auxiliary import (
whoami,
set_pilot_state,
cut_output,
locate_core_file
)
from pilot.util.config import config
from pilot.util.container import execute #, execute_command
from pilot.util.filehandling import remove_files, find_latest_modified_file, verify_file_list, copy, list_mod_files
from pilot.util.filehandling import (
remove_files,
find_latest_modified_file,
verify_file_list,
copy,
list_mod_files
)
from pilot.util.heartbeat import time_since_suspension
from pilot.util.parameters import convert_to_int
from pilot.util.processes import kill_process, find_zombies, handle_zombies, reap_zombies
from pilot.util.psutils import get_child_processes, get_subprocesses
from pilot.util.processes import (
kill_process,
find_zombies,
handle_zombies,
reap_zombies
)
from pilot.util.psutils import (
get_child_processes,
get_subprocesses
)
from pilot.util.timing import time_stamp

logger = logging.getLogger(__name__)

errors = ErrorCodes()


Expand Down Expand Up @@ -70,6 +89,12 @@ def looping_job(job: Any, montime: Any) -> (int, str):
# check, the returned value will be the same as the previous time
time_last_touched, recent_files = get_time_for_last_touch(job, montime, looping_limit)

# correct for job suspension if detected
time_since_job_suspension = time_since_suspension()
if time_since_job_suspension:
logger.info(f'looping job killer adjusting for job suspension: {time_since_job_suspension} s (adding to time_last_touched))')
time_last_touched += time_since_job_suspension

# the payload process is considered to be looping if it's files have not been touched within looping_limit time
if time_last_touched:
currenttime = int(time.time())
Expand Down

0 comments on commit 75d3072

Please sign in to comment.