diff --git a/pypiper/manager.py b/pypiper/manager.py index 57af110..660ed85 100644 --- a/pypiper/manager.py +++ b/pypiper/manager.py @@ -329,7 +329,7 @@ def __init__( signal.signal(signal.SIGTERM, self._signal_term_handler) # pipestat setup - self.pipestat_sample_name = pipestat_sample_name or DEFAULT_SAMPLE_NAME + self.pipestat_record_identifier = pipestat_sample_name or DEFAULT_SAMPLE_NAME # don't force default pipestat_results_file value unless # pipestat config not provided @@ -343,7 +343,7 @@ def _get_arg(args_dict, arg_name): return None if arg_name not in args_dict else args_dict[arg_name] self._pipestat_manager = PipestatManager( - record_identifier=self.pipestat_sample_name + record_identifier=self.pipestat_record_identifier or _get_arg(args_dict, "pipestat_sample_name") or DEFAULT_SAMPLE_NAME, pipeline_name=self.name, @@ -436,7 +436,7 @@ def _completed(self): :return bool: Whether the managed pipeline is in a completed state. """ return ( - self.pipestat.get_status(self._pipestat_manager.cfg["record_identifier"]) + self.pipestat.get_status(self._pipestat_manager.record_identifier) == COMPLETE_FLAG ) @@ -448,7 +448,7 @@ def _failed(self): :return bool: Whether the managed pipeline is in a failed state. """ return ( - self.pipestat.get_status(self._pipestat_manager.cfg["record_identifier"]) + self.pipestat.get_status(self._pipestat_manager.record_identifier) == FAIL_FLAG ) @@ -459,7 +459,7 @@ def halted(self): :return bool: Whether the managed pipeline is in a paused/halted state. """ return ( - self.pipestat.get_status(self._pipestat_manager.cfg["record_identifier"]) + self.pipestat.get_status(self._pipestat_manager.record_identifier) == PAUSE_FLAG ) @@ -723,11 +723,11 @@ def start_pipeline(self, args=None, multi=False): results = self._pipestat_manager.__str__().split("\n") for i in results: self.info("* " + i) - self.info("* Sample name: " + self.pipestat_sample_name + "\n") + self.info("* Sample name: " + self.pipestat_record_identifier + "\n") self.info("\n----------------------------------------\n") self.status = "running" self.pipestat.set_status( - record_identifier=self._pipestat_manager.cfg["record_identifier"], + record_identifier=self._pipestat_manager.record_identifier, status_identifier="running", ) @@ -774,7 +774,7 @@ def _set_status_flag(self, status): prev_status = self.status self.status = status self.pipestat.set_status( - record_identifier=self._pipestat_manager.cfg["record_identifier"], + record_identifier=self._pipestat_manager.record_identifier, status_identifier=status, ) self.debug("\nChanged status from {} to {}.".format(prev_status, self.status)) @@ -791,8 +791,8 @@ def _flag_file_path(self, status=None): """ flag_file_name = "{}_{}_{}".format( - self._pipestat_manager.cfg["pipeline_name"], - self.pipestat_sample_name, + self._pipestat_manager.pipeline_name, + self.pipestat_record_identifier, flag_name(status or self.status), ) return pipeline_filepath(self, filename=flag_file_name) @@ -1424,7 +1424,7 @@ def _wait_for_lock(self, lock_file): ) # self._set_status_flag(WAIT_FLAG) self.pipestat.set_status( - record_identifier=self._pipestat_manager.cfg["record_identifier"], + record_identifier=self._pipestat_manager.record_identifier, status_identifier="waiting", ) first_message_flag = True @@ -1448,7 +1448,7 @@ def _wait_for_lock(self, lock_file): self.timestamp("File unlocked.") # self._set_status_flag(RUN_FLAG) self.pipestat.set_status( - record_identifier=self._pipestat_manager.cfg["record_identifier"], + record_identifier=self._pipestat_manager.record_identifier, status_identifier="running", ) @@ -1607,7 +1607,7 @@ def report_result(self, key, value, nolog=False, result_formatter=None): reported_result = self.pipestat.report( values={key: value}, - record_identifier=self.pipestat_sample_name, + record_identifier=self.pipestat_record_identifier, result_formatter=rf, ) @@ -1689,7 +1689,9 @@ def report_object( val = {key: message_raw.replace("\t", " ")} reported_result = self.pipestat.report( - values=val, record_identifier=self.pipestat_sample_name, result_formatter=rf + values=val, + record_identifier=self.pipestat_record_identifier, + result_formatter=rf, ) if not nolog: for r in reported_result: @@ -1856,12 +1858,12 @@ def _refresh_stats(self): _, data = read_yaml_data(path=self.pipeline_stats_file, what="stats_file") print(data) pipeline_key = list( - data[self._pipestat_manager.cfg["pipeline_name"]][ + data[self._pipestat_manager.pipeline_name][ self.pipestat["_pipeline_type"] ] )[0] if self.name == pipeline_key: - for key, value in data[self._pipestat_manager.cfg["pipeline_name"]][ + for key, value in data[self._pipestat_manager.pipeline_name][ self.pipestat["_pipeline_type"] ][pipeline_key].items(): self.stats_dict[key] = value.strip() @@ -2032,7 +2034,7 @@ def fail_pipeline(self, exc: Exception, dynamic_recover: bool = False): self.info("Total time: " + str(total_time)) self.info("Failure reason: " + str(exc)) self.pipestat.set_status( - record_identifier=self._pipestat_manager.cfg["record_identifier"], + record_identifier=self._pipestat_manager.record_identifier, status_identifier="failed", ) @@ -2093,7 +2095,7 @@ def stop_pipeline(self, status=COMPLETE_FLAG): """ # self._set_status_flag(status) self.pipestat.set_status( - record_identifier=self._pipestat_manager.cfg["record_identifier"], + record_identifier=self._pipestat_manager.record_identifier, status_identifier=status, ) self._cleanup() @@ -2464,8 +2466,8 @@ def _cleanup(self, dry_run=False): for fn in glob.glob(self.outfolder + flag_name("*")) if COMPLETE_FLAG not in os.path.basename(fn) and not "{}_{}_{}".format( - self._pipestat_manager.cfg["pipeline_name"], - self.pipestat_sample_name, + self._pipestat_manager.pipeline_name, + self.pipestat_record_identifier, run_flag, ) == os.path.basename(fn)