Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

updated to pipestat 0.6.0 #199

Merged
merged 9 commits into from
Nov 10, 2023
2 changes: 1 addition & 1 deletion .github/workflows/run-pytest.yml
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ jobs:
runs-on: ${{ matrix.os }}
strategy:
matrix:
python-version: ["3.7", "3.8", "3.9", "3.10"]
python-version: ["3.8", "3.10"]
os: [ubuntu-latest]

steps:
Expand Down
38 changes: 21 additions & 17 deletions pypiper/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -330,7 +330,6 @@ def __init__(

# pipestat setup
self.pipestat_sample_name = pipestat_sample_name or DEFAULT_SAMPLE_NAME
# getattr(self, "sample_name", DEFAULT_SAMPLE_NAME)

# don't force default pipestat_results_file value unless
# pipestat config not provided
Expand Down Expand Up @@ -437,7 +436,7 @@ def _completed(self):
:return bool: Whether the managed pipeline is in a completed state.
"""
return (
self.pipestat.get_status(self._pipestat_manager.sample_name)
self.pipestat.get_status(self._pipestat_manager.cfg["record_identifier"])
donaldcampbelljr marked this conversation as resolved.
Show resolved Hide resolved
== COMPLETE_FLAG
)

Expand All @@ -448,7 +447,10 @@ def _failed(self):

:return bool: Whether the managed pipeline is in a failed state.
"""
return self.pipestat.get_status(self._pipestat_manager.sample_name) == FAIL_FLAG
return (
self.pipestat.get_status(self._pipestat_manager.cfg["record_identifier"])
== FAIL_FLAG
)

@property
def halted(self):
Expand All @@ -457,7 +459,8 @@ def halted(self):
:return bool: Whether the managed pipeline is in a paused/halted state.
"""
return (
self.pipestat.get_status(self._pipestat_manager.sample_name) == PAUSE_FLAG
self.pipestat.get_status(self._pipestat_manager.cfg["record_identifier"])
== PAUSE_FLAG
)

@property
Expand Down Expand Up @@ -724,7 +727,7 @@ def start_pipeline(self, args=None, multi=False):
self.info("\n----------------------------------------\n")
self.status = "running"
self.pipestat.set_status(
record_identifier=self._pipestat_manager.sample_name,
record_identifier=self._pipestat_manager.cfg["record_identifier"],
status_identifier="running",
)

Expand Down Expand Up @@ -771,7 +774,7 @@ def _set_status_flag(self, status):
prev_status = self.status
self.status = status
self.pipestat.set_status(
record_identifier=self._pipestat_manager.sample_name,
record_identifier=self._pipestat_manager.cfg["record_identifier"],
donaldcampbelljr marked this conversation as resolved.
Show resolved Hide resolved
status_identifier=status,
)
self.debug("\nChanged status from {} to {}.".format(prev_status, self.status))
Expand All @@ -788,7 +791,7 @@ def _flag_file_path(self, status=None):
"""

flag_file_name = "{}_{}_{}".format(
self._pipestat_manager["_pipeline_name"],
self._pipestat_manager.cfg["pipeline_name"],
donaldcampbelljr marked this conversation as resolved.
Show resolved Hide resolved
self.pipestat_sample_name,
flag_name(status or self.status),
)
Expand Down Expand Up @@ -1421,7 +1424,7 @@ def _wait_for_lock(self, lock_file):
)
# self._set_status_flag(WAIT_FLAG)
self.pipestat.set_status(
record_identifier=self._pipestat_manager.sample_name,
record_identifier=self._pipestat_manager.cfg["record_identifier"],
status_identifier="waiting",
)
first_message_flag = True
Expand All @@ -1445,7 +1448,7 @@ def _wait_for_lock(self, lock_file):
self.timestamp("File unlocked.")
# self._set_status_flag(RUN_FLAG)
self.pipestat.set_status(
record_identifier=self._pipestat_manager.sample_name,
record_identifier=self._pipestat_manager.cfg["record_identifier"],
status_identifier="running",
)

Expand Down Expand Up @@ -1853,10 +1856,12 @@ def _refresh_stats(self):
_, data = read_yaml_data(path=self.pipeline_stats_file, what="stats_file")
print(data)
pipeline_key = list(
data[self.pipestat["_pipeline_name"]][self.pipestat["_pipeline_type"]]
data[self._pipestat_manager.cfg["pipeline_name"]][
self.pipestat["_pipeline_type"]
]
)[0]
if self.name == pipeline_key:
for key, value in data[self.pipestat["_pipeline_name"]][
for key, value in data[self._pipestat_manager.cfg["pipeline_name"]][
self.pipestat["_pipeline_type"]
][pipeline_key].items():
self.stats_dict[key] = value.strip()
Expand Down Expand Up @@ -1991,12 +1996,12 @@ def complete(self):
"""Stop a completely finished pipeline."""
self.stop_pipeline(status=COMPLETE_FLAG)

def fail_pipeline(self, exc, dynamic_recover=False):
def fail_pipeline(self, exc: Exception, dynamic_recover: bool = False):
"""
If the pipeline does not complete, this function will stop the pipeline gracefully.
It sets the status flag to failed and skips the normal success completion procedure.

:param Exception e: Exception to raise.
:param Exception exc: Exception to raise.
:param bool dynamic_recover: Whether to recover e.g. for job termination.
"""
# Take care of any active running subprocess
Expand Down Expand Up @@ -2026,9 +2031,8 @@ def fail_pipeline(self, exc, dynamic_recover=False):
total_time = datetime.timedelta(seconds=self.time_elapsed(self.starttime))
self.info("Total time: " + str(total_time))
self.info("Failure reason: " + str(exc))
# self._set_status_flag(FAIL_FLAG)
self.pipestat.set_status(
record_identifier=self._pipestat_manager.sample_name,
record_identifier=self._pipestat_manager.cfg["record_identifier"],
status_identifier="failed",
)

Expand Down Expand Up @@ -2089,7 +2093,7 @@ def stop_pipeline(self, status=COMPLETE_FLAG):
"""
# self._set_status_flag(status)
self.pipestat.set_status(
record_identifier=self._pipestat_manager.sample_name,
record_identifier=self._pipestat_manager.cfg["record_identifier"],
status_identifier=status,
)
self._cleanup()
Expand Down Expand Up @@ -2460,7 +2464,7 @@ def _cleanup(self, dry_run=False):
for fn in glob.glob(self.outfolder + flag_name("*"))
if COMPLETE_FLAG not in os.path.basename(fn)
and not "{}_{}_{}".format(
self._pipestat_manager["_pipeline_name"],
self._pipestat_manager.cfg["pipeline_name"],
self.pipestat_sample_name,
run_flag,
)
Expand Down
2 changes: 1 addition & 1 deletion pypiper/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -330,7 +330,7 @@ def run(self, start_point=None, stop_before=None, stop_after=None):
# between results from different stages.
skip_mode = False

print("Running stage: {}".format(stage))
print(f"Running stage: {getattr(stage, 'name', str(stage))}")

stage.run()
self.executed.append(stage)
Expand Down
2 changes: 2 additions & 0 deletions pypiper/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -1113,6 +1113,8 @@ def _add_args(parser, args, required):
def result_formatter_markdown(pipeline_name, record_identifier, res_id, value) -> str:
"""
Returns Markdown formatted value as string

# Pipeline_name and record_identifier should be kept because pipestat needs it
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can the input be changed to be more general **kwargs?

"""

message_markdown = "\n> `{key}`\t{value}\t_RES_".format(key=res_id, value=value)
Expand Down
3 changes: 1 addition & 2 deletions requirements/requirements-pypiper.txt
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
attmap>=0.12.5
logmuse>=0.2.4
psutil
pandas
ubiquerg>=0.4.5
yacman
pipestat>=0.6.0a1
pipestat>=0.6.0a5
5 changes: 4 additions & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,10 @@ def read_reqs_file(reqs_name):
classifiers=[
"Development Status :: 4 - Beta",
"License :: OSI Approved :: BSD License",
"Programming Language :: Python :: 2.7",
"Programming Language :: Python :: 3.8",
"Programming Language :: Python :: 3.9",
"Programming Language :: Python :: 3.10",
"Programming Language :: Python :: 3.11",
"Topic :: Scientific/Engineering :: Bio-Informatics",
],
author="Nathan Sheffield, Johanna Klughammer, Andre Rendeiro",
Expand Down
Loading