Skip to content

Commit

Permalink
minor refactor to use pipestat properties instead of cfg dict
Browse files Browse the repository at this point in the history
  • Loading branch information
donaldcampbelljr committed Nov 10, 2023
1 parent 1c2c844 commit 5b46682
Showing 1 changed file with 22 additions and 20 deletions.
42 changes: 22 additions & 20 deletions pypiper/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -329,7 +329,7 @@ def __init__(
signal.signal(signal.SIGTERM, self._signal_term_handler)

# pipestat setup
self.pipestat_sample_name = pipestat_sample_name or DEFAULT_SAMPLE_NAME
self.pipestat_record_identifier = pipestat_sample_name or DEFAULT_SAMPLE_NAME

# don't force default pipestat_results_file value unless
# pipestat config not provided
Expand All @@ -343,7 +343,7 @@ def _get_arg(args_dict, arg_name):
return None if arg_name not in args_dict else args_dict[arg_name]

self._pipestat_manager = PipestatManager(
record_identifier=self.pipestat_sample_name
record_identifier=self.pipestat_record_identifier
or _get_arg(args_dict, "pipestat_sample_name")
or DEFAULT_SAMPLE_NAME,
pipeline_name=self.name,
Expand Down Expand Up @@ -436,7 +436,7 @@ def _completed(self):
:return bool: Whether the managed pipeline is in a completed state.
"""
return (
self.pipestat.get_status(self._pipestat_manager.cfg["record_identifier"])
self.pipestat.get_status(self._pipestat_manager.record_identifier)
== COMPLETE_FLAG
)

Expand All @@ -448,7 +448,7 @@ def _failed(self):
:return bool: Whether the managed pipeline is in a failed state.
"""
return (
self.pipestat.get_status(self._pipestat_manager.cfg["record_identifier"])
self.pipestat.get_status(self._pipestat_manager.record_identifier)
== FAIL_FLAG
)

Expand All @@ -459,7 +459,7 @@ def halted(self):
:return bool: Whether the managed pipeline is in a paused/halted state.
"""
return (
self.pipestat.get_status(self._pipestat_manager.cfg["record_identifier"])
self.pipestat.get_status(self._pipestat_manager.record_identifier)
== PAUSE_FLAG
)

Expand Down Expand Up @@ -723,11 +723,11 @@ def start_pipeline(self, args=None, multi=False):
results = self._pipestat_manager.__str__().split("\n")
for i in results:
self.info("* " + i)
self.info("* Sample name: " + self.pipestat_sample_name + "\n")
self.info("* Sample name: " + self.pipestat_record_identifier + "\n")
self.info("\n----------------------------------------\n")
self.status = "running"
self.pipestat.set_status(
record_identifier=self._pipestat_manager.cfg["record_identifier"],
record_identifier=self._pipestat_manager.record_identifier,
status_identifier="running",
)

Expand Down Expand Up @@ -774,7 +774,7 @@ def _set_status_flag(self, status):
prev_status = self.status
self.status = status
self.pipestat.set_status(
record_identifier=self._pipestat_manager.cfg["record_identifier"],
record_identifier=self._pipestat_manager.record_identifier,
status_identifier=status,
)
self.debug("\nChanged status from {} to {}.".format(prev_status, self.status))
Expand All @@ -791,8 +791,8 @@ def _flag_file_path(self, status=None):
"""

flag_file_name = "{}_{}_{}".format(
self._pipestat_manager.cfg["pipeline_name"],
self.pipestat_sample_name,
self._pipestat_manager.pipeline_name,
self.pipestat_record_identifier,
flag_name(status or self.status),
)
return pipeline_filepath(self, filename=flag_file_name)
Expand Down Expand Up @@ -1424,7 +1424,7 @@ def _wait_for_lock(self, lock_file):
)
# self._set_status_flag(WAIT_FLAG)
self.pipestat.set_status(
record_identifier=self._pipestat_manager.cfg["record_identifier"],
record_identifier=self._pipestat_manager.record_identifier,
status_identifier="waiting",
)
first_message_flag = True
Expand All @@ -1448,7 +1448,7 @@ def _wait_for_lock(self, lock_file):
self.timestamp("File unlocked.")
# self._set_status_flag(RUN_FLAG)
self.pipestat.set_status(
record_identifier=self._pipestat_manager.cfg["record_identifier"],
record_identifier=self._pipestat_manager.record_identifier,
status_identifier="running",
)

Expand Down Expand Up @@ -1607,7 +1607,7 @@ def report_result(self, key, value, nolog=False, result_formatter=None):

reported_result = self.pipestat.report(
values={key: value},
record_identifier=self.pipestat_sample_name,
record_identifier=self.pipestat_record_identifier,
result_formatter=rf,
)

Expand Down Expand Up @@ -1689,7 +1689,9 @@ def report_object(
val = {key: message_raw.replace("\t", " ")}

reported_result = self.pipestat.report(
values=val, record_identifier=self.pipestat_sample_name, result_formatter=rf
values=val,
record_identifier=self.pipestat_record_identifier,
result_formatter=rf,
)
if not nolog:
for r in reported_result:
Expand Down Expand Up @@ -1856,12 +1858,12 @@ def _refresh_stats(self):
_, data = read_yaml_data(path=self.pipeline_stats_file, what="stats_file")
print(data)
pipeline_key = list(
data[self._pipestat_manager.cfg["pipeline_name"]][
data[self._pipestat_manager.pipeline_name][
self.pipestat["_pipeline_type"]
]
)[0]
if self.name == pipeline_key:
for key, value in data[self._pipestat_manager.cfg["pipeline_name"]][
for key, value in data[self._pipestat_manager.pipeline_name][
self.pipestat["_pipeline_type"]
][pipeline_key].items():
self.stats_dict[key] = value.strip()
Expand Down Expand Up @@ -2032,7 +2034,7 @@ def fail_pipeline(self, exc: Exception, dynamic_recover: bool = False):
self.info("Total time: " + str(total_time))
self.info("Failure reason: " + str(exc))
self.pipestat.set_status(
record_identifier=self._pipestat_manager.cfg["record_identifier"],
record_identifier=self._pipestat_manager.record_identifier,
status_identifier="failed",
)

Expand Down Expand Up @@ -2093,7 +2095,7 @@ def stop_pipeline(self, status=COMPLETE_FLAG):
"""
# self._set_status_flag(status)
self.pipestat.set_status(
record_identifier=self._pipestat_manager.cfg["record_identifier"],
record_identifier=self._pipestat_manager.record_identifier,
status_identifier=status,
)
self._cleanup()
Expand Down Expand Up @@ -2464,8 +2466,8 @@ def _cleanup(self, dry_run=False):
for fn in glob.glob(self.outfolder + flag_name("*"))
if COMPLETE_FLAG not in os.path.basename(fn)
and not "{}_{}_{}".format(
self._pipestat_manager.cfg["pipeline_name"],
self.pipestat_sample_name,
self._pipestat_manager.pipeline_name,
self.pipestat_record_identifier,
run_flag,
)
== os.path.basename(fn)
Expand Down

0 comments on commit 5b46682

Please sign in to comment.