if first_message_flag is False:
self.timestamp("Waiting for file lock: " + lock_file)
- self.warning("This indicates that another process may be executing this "
+ self.warning(
+ "This indicates that another process may be executing this "
"command, or the pipeline was not properly shut down. If the "
"pipeline was not properly shut down last time, "
"you should restart it in 'recover' mode (-R) to indicate that "
- "this step should be restarted.")
- self._set_status_flag(WAIT_FLAG)
+ "this step should be restarted."
+ )
+ # self._set_status_flag(WAIT_FLAG)
+ self.pipestat.set_status(
+ sample_name=self._pipestat_manager.sample_name,
+ status_identifier="waiting",
+ )
first_message_flag = True
@@ -1121,7 +1441,11 @@ def _wait_for_lock(self, lock_file):
if first_message_flag:
self.timestamp("File unlocked.")
- self._set_status_flag(RUN_FLAG)
+ # self._set_status_flag(RUN_FLAG)
+ self.pipestat.set_status(
+ sample_name=self._pipestat_manager.sample_name,
+ status_identifier="running",
+ )
# Logging functions
@@ -1145,8 +1469,7 @@ def critical(self, msg, *args, **kwargs):
def fatal(self, msg, *args, **kwargs):
self._logger.fatal(msg, *args, **kwargs)
- def timestamp(self, message="", checkpoint=None,
- finished=False, raise_error=True):
+ def timestamp(self, message="", checkpoint=None, finished=False, raise_error=True):
Print message, time, and time elapsed, perhaps creating checkpoint.
@@ -1189,7 +1512,9 @@ def timestamp(self, message="", checkpoint=None,
self.curr_checkpoint = checkpoint
# Handle the two halting conditions.
- if (finished and checkpoint == self.stop_after) or (not finished and checkpoint == self.stop_before):
+ if (finished and checkpoint == self.stop_after) or (
+ not finished and checkpoint == self.stop_before
+ ):
self.halt(checkpoint, finished, raise_error=raise_error)
# Determine if we've started executing.
elif checkpoint == self.start_point:
@@ -1203,13 +1528,17 @@ def timestamp(self, message="", checkpoint=None,
elapsed = self.time_elapsed(self.last_timestamp)
t = time.strftime("%m-%d %H:%M:%S")
if checkpoint is None:
- msg = "{m} ({t}) elapsed: {delta_t} _TIME_".\
- format(m=message, t=t, delta_t=elapsed)
+ msg = "{m} ({t}) elapsed: {delta_t} _TIME_".format(
+ m=message, t=t, delta_t=elapsed
+ )
- msg = "{m} ({t}) ({status} {stage}) elapsed: {delta_t} _TIME_".\
- format(m=message, t=t,
- status="finished" if finished else "starting",
- stage=checkpoint, delta_t=elapsed)
+ msg = "{m} ({t}) ({status} {stage}) elapsed: {delta_t} _TIME_".format(
+ m=message,
+ t=t,
+ status="finished" if finished else "starting",
+ stage=checkpoint,
+ delta_t=elapsed,
+ )
if re.match("^###", message):
msg = "\n{}\n".format(msg)
@@ -1224,59 +1553,78 @@ def time_elapsed(time_since):
return round(time.time() - time_since, 0)
- def _report_profile(self, command, lock_name, elapsed_time, memory, pid, args_hash, proc_count):
+ def _report_profile(
+ self, command, lock_name, elapsed_time, memory, pid, args_hash, proc_count
+ ):
Writes a string to self.pipeline_profile_file.
- rel_lock_name = lock_name if lock_name is None else os.path.relpath(lock_name, self.outfolder)
- message_raw = str(pid) + "\t" + \
- str(args_hash) + "\t" + \
- str(proc_count) + "\t" + \
- str(datetime.timedelta(seconds=round(elapsed_time, 2))) + "\t " + \
- str(round(memory, 4)) + "\t" + \
- str(command) + "\t" + \
- str(rel_lock_name)
+ rel_lock_name = (
+ lock_name
+ if lock_name is None
+ else os.path.relpath(lock_name, self.outfolder)
+ )
+ message_raw = (
+ str(pid)
+ + "\t"
+ + str(args_hash)
+ + "\t"
+ + str(proc_count)
+ + "\t"
+ + str(datetime.timedelta(seconds=round(elapsed_time, 2)))
+ + "\t "
+ + str(round(memory, 4))
+ + "\t"
+ + str(command)
+ + "\t"
+ + str(rel_lock_name)
+ )
with open(self.pipeline_profile_file, "a") as myfile:
myfile.write(message_raw + "\n")
- def report_result(self, key, value, annotation=None, nolog=False):
+ def report_result(self, key, value, nolog=False, result_formatter=None):
- Writes a string to self.pipeline_stats_file.
+ Writes a key:value pair to self.pipeline_stats_file.
:param str key: name (key) of the stat
- :param str annotation: By default, the stats will be annotated with the
- pipeline name, so you can tell which pipeline records which stats.
- If you want, you can change this; use annotation='shared' if you
- need the stat to be used by another pipeline (using get_stat()).
+ :param dict value: value of the stat to report.
:param bool nolog: Turn on this flag to NOT print this result in the
logfile. Use sparingly in case you will be printing the result in a
different format.
- """
- # Default annotation is current pipeline name.
- annotation = str(annotation or self.name)
- # In case the value is passed with trailing whitespace.
- value = str(value).strip()
+ :param str result_formatter: function for formatting via pipestat backend
+ :return str reported_result: the reported result is returned as a list of formatted strings.
+ """
# keep the value in memory:
self.stats_dict[key] = value
- message_raw = "{key}\t{value}\t{annotation}".format(
- key=key, value=value, annotation=annotation)
- message_markdown = "\n> `{key}`\t{value}\t{annotation}\t_RES_".format(
- key=key, value=value, annotation=annotation)
+ rf = result_formatter or self.pipestat_result_formatter
+ reported_result = self.pipestat.report(
+ values={key: value},
+ sample_name=self.pipestat_sample_name,
+ result_formatter=rf,
+ )
if not nolog:
- self.info(message_markdown)
+ for r in reported_result:
+ self.info(r)
- # Just to be extra careful, let's lock the file while we we write
- # in case multiple pipelines write to the same file.
- self._safe_write_to_file(self.pipeline_stats_file, message_raw)
+ return reported_result
- def report_object(self, key, filename, anchor_text=None, anchor_image=None, annotation=None):
+ def report_object(
+ self,
+ key,
+ filename,
+ anchor_text=None,
+ anchor_image=None,
+ annotation=None,
+ nolog=False,
+ result_formatter=None,
+ ):
- Writes a string to self.pipeline_objects_file. Used to report figures
- and others.
+ Writes a key:value pair to self.pipeline_stats_file. Note: this function
+ will be deprecated. Using report_result is recommended.
:param str key: name (key) of the object
:param str filename: relative path to the file (relative to parent
@@ -1289,74 +1637,63 @@ def report_object(self, key, filename, anchor_text=None, anchor_image=None, anno
:param str annotation: By default, the figures will be annotated with
the pipeline name, so you can tell which pipeline records which
figures. If you want, you can change this.
- """
+ :param bool nolog: Turn on this flag to NOT print this result in the
+ logfile. Use sparingly in case you will be printing the result in a
+ different format.
+ :param str result_formatter: function for formatting via pipestat backend
+ :return str reported_result: the reported result is returned as a list of formatted strings.
+ """
+ warnings.warn(
+ "This function may be removed in future release. "
+ "The recommended way to report pipeline results is using PipelineManager.pipestat.report().",
+ category=DeprecationWarning,
+ )
+ rf = result_formatter or self.pipestat_result_formatter
# Default annotation is current pipeline name.
annotation = str(annotation or self.name)
# In case the value is passed with trailing whitespace.
filename = str(filename).strip()
if anchor_text:
anchor_text = str(anchor_text).strip()
anchor_text = str(key).strip()
# better to use a relative path in this file
# convert any absolute paths into relative paths
- relative_filename = os.path.relpath(filename, self.outfolder) \
- if os.path.isabs(filename) else filename
+ relative_filename = (
+ os.path.relpath(filename, self.outfolder)
+ if os.path.isabs(filename)
+ else filename
+ )
if anchor_image:
- relative_anchor_image = os.path.relpath(anchor_image, self.outfolder) \
- if os.path.isabs(anchor_image) else anchor_image
+ relative_anchor_image = (
+ os.path.relpath(anchor_image, self.outfolder)
+ if os.path.isabs(anchor_image)
+ else anchor_image
+ )
relative_anchor_image = "None"
- message_raw = "{key}\t{filename}\t{anchor_text}\t{anchor_image}\t{annotation}".format(
- key=key, filename=relative_filename, anchor_text=anchor_text,
- anchor_image=relative_anchor_image, annotation=annotation)
- message_markdown = "> `{key}`\t{filename}\t{anchor_text}\t{anchor_image}\t{annotation}\t_OBJ_".format(
- key=key, filename=relative_filename, anchor_text=anchor_text,
- anchor_image=relative_anchor_image, annotation=annotation)
- self.warning(message_markdown)
+ message_raw = "{filename}\t{anchor_text}\t{anchor_image}\t{annotation}".format(
+ filename=relative_filename,
+ anchor_text=anchor_text,
+ anchor_image=relative_anchor_image,
+ annotation=annotation,
+ )
- self._safe_write_to_file(self.pipeline_objects_file, message_raw)
- def _safe_write_to_file(self, file, message):
- """
- Writes a string to a file safely (with file locks).
- """
- target = file
- lock_name = make_lock_name(target, self.outfolder)
- lock_file = self._make_lock_path(lock_name)
+ val = {key: message_raw.replace("\t", " ")}
- while True:
- if os.path.isfile(lock_file):
- self._wait_for_lock(lock_file)
- else:
- try:
- self.locks.append(lock_file)
- self._create_file_racefree(lock_file)
- except OSError as e:
- if e.errno == errno.EEXIST:
- self.warning("Lock file created after test! Looping again.")
- continue # Go back to start
- # Proceed with file writing
- with open(file, "a") as myfile:
- myfile.write(message + "\n")
- os.remove(lock_file)
- self.locks.remove(lock_file)
- # If you make it to the end of the while loop, you're done
- break
+ reported_result = self.pipestat.report(
+ values=val, sample_name=self.pipestat_sample_name, result_formatter=rf
+ )
+ if not nolog:
+ for r in reported_result:
+ self.info(r)
+ return reported_result
def _report_command(self, cmd, procs=None):
- Writes a command to both stdout and to the commands log file
+ Writes a command to both stdout and to the commands log file
:param str cmd: command to report
@@ -1385,22 +1722,22 @@ def _report_command(self, cmd, procs=None):
def _create_file(file):
- Creates a file, but will not fail if the file already exists.
- This is vulnerable to race conditions; use this for cases where it
+ Creates a file, but will not fail if the file already exists.
+ This is vulnerable to race conditions; use this for cases where it
doesn't matter if this process is the one that created the file.
:param str file: File to create.
- with open(file, 'w') as fout:
- fout.write('')
+ with open(file, "w") as fout:
+ fout.write("")
def _create_file_racefree(file):
Creates a file, but fails if the file already exists.
- This function will thus only succeed if this process actually creates
- the file; if the file already exists, it will cause an OSError,
+ This function will thus only succeed if this process actually creates
+ the file; if the file already exists, it will cause an OSError,
solving race conditions.
:param str file: File to create.
@@ -1411,15 +1748,18 @@ def _create_file_racefree(file):
def _ensure_lock_prefix(lock_name_base):
- """ Ensure that an alleged lock file is correctly prefixed. """
- return lock_name_base if lock_name_base.startswith(LOCK_PREFIX) \
- else LOCK_PREFIX + lock_name_base
+ """Ensure that an alleged lock file is correctly prefixed."""
+ return (
+ lock_name_base
+ if lock_name_base.startswith(LOCK_PREFIX)
+ else LOCK_PREFIX + lock_name_base
+ )
def _make_lock_path(self, lock_name_base):
Create path to lock file with given name as base.
- :param str lock_name_base: Lock file name, designed to not be prefixed
+ :param str lock_name_base: Lock file name, designed to not be prefixed
with the lock file designation, but that's permitted.
:return str: Path to the lock file.
@@ -1436,8 +1776,8 @@ def _make_lock_path(self, lock_name_base):
def _recoverfile_from_lockfile(self, lockfile):
Create path to recovery file with given name as base.
- :param str lockfile: Name of file on which to base this path,
+ :param str lockfile: Name of file on which to base this path,
perhaps already prefixed with the designation of a lock file.
:return str: Path to recovery file.
@@ -1453,7 +1793,7 @@ def make_sure_path_exists(path):
Creates all directories in a path if it does not exist.
:param str path: Path to create.
- :raises Exception: if the path creation attempt hits an error with
+ :raises Exception: if the path creation attempt hits an error with
a code indicating a cause other than pre-existence.
@@ -1468,41 +1808,32 @@ def make_sure_path_exists(path):
def _refresh_stats(self):
- Loads up the stats sheet created for this pipeline run and reads
+ Loads up the stats yaml created for this pipeline run and reads
those stats into memory
- # regex identifies all possible stats files.
- #regex = self.outfolder + "*_stats.tsv"
- #stats_files = glob.glob(regex)
- #stats_files.insert(self.pipeline_stats_file) # last one is the current pipeline
- #for stats_file in stats_files:
- stats_file = self.pipeline_stats_file
if os.path.isfile(self.pipeline_stats_file):
- with open(stats_file, 'r') as stat_file:
- for line in stat_file:
- try:
- # Someone may have put something that's not 3 columns in the stats file
- # if so, shame on him, but we can just ignore it.
- key, value, annotation = line.split('\t')
- except ValueError:
- self.warning("WARNING: Each row in a stats file is expected to have 3 columns")
- if annotation.rstrip() == self.name or annotation.rstrip() == "shared":
- self.stats_dict[key] = value.strip()
- #if os.path.isfile(self.pipeline_stats_file):
+ _, data = read_yaml_data(path=self.pipeline_stats_file, what="stats_file")
+ print(data)
+ pipeline_key = list(
+ data[self.pipestat["_pipeline_name"]][self.pipestat["_pipeline_type"]]
+ )[0]
+ if self.name == pipeline_key:
+ for key, value in data[self.pipestat["_pipeline_name"]][
+ self.pipestat["_pipeline_type"]
+ ][pipeline_key].items():
+ self.stats_dict[key] = value.strip()
def get_stat(self, key):
Returns a stat that was previously reported. This is necessary for reporting new stats that are
- derived from two stats, one of which may have been reported by an earlier run. For example,
+ derived from two stats, one of which may have been reported by an earlier run. For example,
if you first use report_result to report (number of trimmed reads), and then in a later stage
- want to report alignment rate, then this second stat (alignment rate) will require knowing the
+ want to report alignment rate, then this second stat (alignment rate) will require knowing the
first stat (number of trimmed reads); however, that may not have been calculated in the current
- pipeline run, so we must retrieve it from the stats.tsv output file. This command will retrieve
+ pipeline run, so we must retrieve it from the stats.yaml output file. This command will retrieve
such previously reported stats if they were not already calculated in the current pipeline run.
- :param key: key of stat to retrieve
+ :param key: key of stat to retrieve
@@ -1562,9 +1893,12 @@ def _checkpoint(self, stage):
# be expected to characterize the extension of a file name/path.
base, ext = os.path.splitext(stage)
if ext and "." not in base:
- self.warning("WARNING: '{}' looks like it may be the name or path of "
- "a file; for such a checkpoint, use touch_checkpoint.".
- format(stage))
+ self.warning(
+ "WARNING: '{}' looks like it may be the name or path of "
+ "a file; for such a checkpoint, use touch_checkpoint.".format(
+ stage
+ )
+ )
if not is_checkpoint:
self.warning("Not a checkpoint: {}".format(stage))
@@ -1596,9 +1930,12 @@ def _touch_checkpoint(self, check_file):
other_folder = os.path.join(folder, "")
this_folder = os.path.join(self.outfolder, "")
if other_folder != this_folder:
- errmsg = "Path provided as checkpoint file isn't in pipeline " \
- "output folder. '{}' is not in '{}'".format(
- check_file, self.outfolder)
+ errmsg = (
+ "Path provided as checkpoint file isn't in pipeline "
+ "output folder. '{}' is not in '{}'".format(
+ check_file, self.outfolder
+ )
+ )
raise ValueError(errmsg)
fpath = check_file
@@ -1607,14 +1944,14 @@ def _touch_checkpoint(self, check_file):
# Create/update timestamp for checkpoint, but base return value on
# whether the action was a simple update or a novel creation.
already_exists = os.path.isfile(fpath)
- open(fpath, 'w').close()
+ open(fpath, "w").close()
action = "Updated" if already_exists else "Created"
self.info("{} checkpoint file: '{}'".format(action, fpath))
return already_exists
def complete(self):
- """ Stop a completely finished pipeline. """
+ """Stop a completely finished pipeline."""
def fail_pipeline(self, exc, dynamic_recover=False):
@@ -1652,7 +1989,11 @@ def fail_pipeline(self, exc, dynamic_recover=False):
total_time = datetime.timedelta(seconds=self.time_elapsed(self.starttime))
self.info("Total time: " + str(total_time))
self.info("Failure reason: " + str(exc))
- self._set_status_flag(FAIL_FLAG)
+ # self._set_status_flag(FAIL_FLAG)
+ self.pipestat.set_status(
+ sample_name=self._pipestat_manager.sample_name,
+ status_identifier="failed",
+ )
if isinstance(exc, str):
exc = RuntimeError(exc)
@@ -1683,16 +2024,21 @@ def get_elapsed_time(self):
:return int: sum of runtimes in seconds
if os.path.isfile(self.pipeline_profile_file):
- df = _pd.read_csv(self.pipeline_profile_file, sep="\t", comment="#", names=PROFILE_COLNAMES)
+ df = _pd.read_csv(
+ self.pipeline_profile_file,
+ sep="\t",
+ comment="#",
+ )
- df['runtime'] = _pd.to_timedelta(df['runtime'])
+ df["runtime"] = _pd.to_timedelta(df["runtime"])
except ValueError:
# return runtime estimate
# this happens if old profile style is mixed with the new one
# and the columns do not match
return self.time_elapsed(self.starttime)
- unique_df = df[~df.duplicated('cid', keep='last').values]
- return sum(unique_df['runtime'].apply(lambda x: x.total_seconds()))
+ unique_df = df[~df.duplicated("cid", keep="last").values]
+ return sum(unique_df["runtime"].apply(lambda x: x.total_seconds()))
return self.time_elapsed(self.starttime)
def stop_pipeline(self, status=COMPLETE_FLAG):
@@ -1701,30 +2047,41 @@ def stop_pipeline(self, status=COMPLETE_FLAG):
This is the "healthy" pipeline completion function.
The normal pipeline completion function, to be run by the pipeline
- at the end of the script. It sets status flag to completed and records
+ at the end of the script. It sets status flag to completed and records
some time and memory statistics to the log file.
- self._set_status_flag(status)
+ # self._set_status_flag(status)
+ self.pipestat.set_status(
+ sample_name=self._pipestat_manager.sample_name, status_identifier=status
+ )
- elapsed_time_this_run = str(datetime.timedelta(seconds=self.time_elapsed(self.starttime)))
- self.report_result("Time",
- elapsed_time_this_run,
- nolog=True)
- self.report_result("Success",
- time.strftime("%m-%d-%H:%M:%S"),
- nolog=True)
+ elapsed_time_this_run = str(
+ datetime.timedelta(seconds=self.time_elapsed(self.starttime))
+ )
+ self.report_result("Time", elapsed_time_this_run, nolog=True)
+ self.report_result("Success", time.strftime("%m-%d-%H:%M:%S"), nolog=True)
self.info("\n### Pipeline completed. Epilogue")
# print("* " + "Total elapsed time".rjust(20) + ": "
# + str(datetime.timedelta(seconds=self.time_elapsed(self.starttime))))
- self.info("* " + "Elapsed time (this run)".rjust(30) + ": " +
- elapsed_time_this_run)
- self.info("* " + "Total elapsed time (all runs)".rjust(30) + ": " +
- str(datetime.timedelta(seconds=round(self.get_elapsed_time()))))
- self.info("* " + "Peak memory (this run)".rjust(30) + ": " +
- str(round(self.peak_memory, 4)) + " GB")
- # self.info("* " + "Total peak memory (all runs)".rjust(30) + ": " +
- # str(round(self.peak_memory, 4)) + " GB")
+ self.info(
+ "* " + "Elapsed time (this run)".rjust(30) + ": " + elapsed_time_this_run
+ )
+ self.info(
+ "* "
+ + "Total elapsed time (all runs)".rjust(30)
+ + ": "
+ + str(datetime.timedelta(seconds=round(self.get_elapsed_time())))
+ )
+ self.info(
+ "* "
+ + "Peak memory (this run)".rjust(30)
+ + ": "
+ + str(round(self.peak_memory, 4))
+ + " GB"
+ )
+ # self.info("* " + "Total peak memory (all runs)".rjust(30) + ": " +
+ # str(round(self.peak_memory, 4)) + " GB")
if self.halted:
@@ -1745,7 +2102,7 @@ def _signal_term_handler(self, signal, frame):
signal_type = "SIGTERM"
def _generic_signal_handler(self, signal_type):
Function for handling both SIGTERM and SIGINT
@@ -1764,7 +2121,7 @@ def _generic_signal_handler(self, signal_type):
# passed directly to the tee subprocess, so I could handle that on
# my own; hence, now I believe I no longer need to do this. I'm
# leaving this code here as a relic in case something comes up.
- #with open(self.pipeline_log_file, "a") as myfile:
+ # with open(self.pipeline_log_file, "a") as myfile:
# myfile.write(message + "\n")
def _signal_int_handler(self, signal, frame):
@@ -1799,10 +2156,9 @@ def _exit_handler(self):
self.fail_pipeline(Exception("Pipeline failure. See details above."))
if self.tee:
- self.tee.kill()
+ self.tee.kill()
def _terminate_running_subprocesses(self):
# make a copy of the list to iterate over since we'll be removing items
for pid in self.running_procs.copy():
proc_dict = self.running_procs[pid]
@@ -1810,9 +2166,18 @@ def _terminate_running_subprocesses(self):
# Close the preformat tag that we opened when the process was spawned.
# record profile of any running processes before killing
elapsed_time = time.time() - self.running_procs[pid]["start_time"]
- process_peak_mem = self._memory_usage(pid, container=proc_dict["container"])/1e6
- self._report_profile(self.running_procs[pid]["proc_name"], None, elapsed_time, process_peak_mem, pid,
- self.running_procs[pid]["args_hash"], self.running_procs[pid]["local_proc_id"])
+ process_peak_mem = (
+ self._memory_usage(pid, container=proc_dict["container"]) / 1e6
+ )
+ self._report_profile(
+ self.running_procs[pid]["proc_name"],
+ None,
+ elapsed_time,
+ process_peak_mem,
+ pid,
+ self.running_procs[pid]["args_hash"],
+ self.running_procs[pid]["local_proc_id"],
+ )
self._kill_child_process(pid, proc_dict["proc_name"])
del self.running_procs[pid]
@@ -1842,10 +2207,10 @@ def pskill(proc_pid, sig=signal.SIGINT):
if proc_name:
proc_string = " ({proc_name})".format(proc_name=proc_name)
- # First a gentle kill
+ # First a gentle kill
still_running = self._attend_process(psutil.Process(child_pid), 0)
- sleeptime = .25
+ sleeptime = 0.25
time_waiting = 0
while still_running and time_waiting < 3:
@@ -1873,9 +2238,12 @@ def pskill(proc_pid, sig=signal.SIGINT):
if still_running:
# still running!?
- self.warning("Child process {child_pid}{proc_string} never responded"
- "I just can't take it anymore. I don't know what to do...".format(child_pid=child_pid,
- proc_string=proc_string))
+ self.warning(
+ "Child process {child_pid}{proc_string} never responded"
+ "I just can't take it anymore. I don't know what to do...".format(
+ child_pid=child_pid, proc_string=proc_string
+ )
+ )
if time_waiting > 0:
note = "terminated after {time} sec".format(time=int(time_waiting))
@@ -1883,12 +2251,13 @@ def pskill(proc_pid, sig=signal.SIGINT):
note = "was already terminated"
msg = "Child process {child_pid}{proc_string} {note}.".format(
- child_pid=child_pid, proc_string=proc_string, note=note)
+ child_pid=child_pid, proc_string=proc_string, note=note
+ )
def _atexit_register(*args):
- """ Convenience alias to register exit functions without having to import atexit in the pipeline. """
+ """Convenience alias to register exit functions without having to import atexit in the pipeline."""
def get_container(self, image, mounts):
@@ -1954,11 +2323,17 @@ def clean_add(self, regex, conditional=False, manual=False):
with open(self.cleanup_file, "a") as myfile:
if os.path.isabs(filename):
- relative_filename = os.path.relpath(filename, self.outfolder)
+ relative_filename = os.path.relpath(
+ filename, self.outfolder
+ )
absolute_filename = filename
- relative_filename = os.path.relpath(filename, self.outfolder)
- absolute_filename = os.path.abspath(os.path.join(self.outfolder, relative_filename))
+ relative_filename = os.path.relpath(
+ filename, self.outfolder
+ )
+ absolute_filename = os.path.abspath(
+ os.path.join(self.outfolder, relative_filename)
+ )
if os.path.isfile(absolute_filename):
# print("Adding file to cleanup: {}".format(filename))
myfile.write("rm " + relative_filename + "\n")
@@ -1969,9 +2344,15 @@ def clean_add(self, regex, conditional=False, manual=False):
# and the directory itself
myfile.write("rmdir " + relative_filename + "\n")
- self.info("File not added to cleanup: {}".format(relative_filename))
+ self.info(
+ "File not added to cleanup: {}".format(
+ relative_filename
+ )
+ )
except Exception as e:
- self.error("Error in clean_add on path {}: {}".format(filename, str(e)))
+ self.error(
+ "Error in clean_add on path {}: {}".format(filename, str(e))
+ )
elif conditional:
@@ -1998,9 +2379,11 @@ def _cleanup(self, dry_run=False):
n_to_clean_cond = len(self.cleanup_list_conditional)
if n_to_clean + n_to_clean_cond > 0:
- self.info("Starting cleanup: {} files; {} conditional files for cleanup".format(
- n_to_clean,
- n_to_clean_cond))
+ self.info(
+ "Starting cleanup: {} files; {} conditional files for cleanup".format(
+ n_to_clean, n_to_clean_cond
+ )
+ )
self.debug("No files to clean.")
@@ -2034,9 +2417,17 @@ def _cleanup(self, dry_run=False):
if n_to_clean_cond > 0:
run_flag = flag_name(RUN_FLAG)
- flag_files = [fn for fn in glob.glob(self.outfolder + flag_name("*"))
- if COMPLETE_FLAG not in os.path.basename(fn)
- and not "{}_{}".format(self.name, run_flag) == os.path.basename(fn)]
+ flag_files = [
+ fn
+ for fn in glob.glob(self.outfolder + flag_name("*"))
+ if COMPLETE_FLAG not in os.path.basename(fn)
+ and not "{}_{}_{}".format(
+ self._pipestat_manager["_pipeline_name"],
+ self.pipestat_sample_name,
+ run_flag,
+ )
+ == os.path.basename(fn)
+ ]
if len(flag_files) == 0 and not dry_run:
self.info("\nCleaning up conditional list. . .")
for expr in self.cleanup_list_conditional:
@@ -2055,9 +2446,14 @@ def _cleanup(self, dry_run=False):
- self.info("\nConditional flag found: " + str([os.path.basename(i) for i in flag_files]))
- self.info("\nThese conditional files were left in place:\n\n- " +
- "\n- ".join(self.cleanup_list_conditional))
+ self.info(
+ "\nConditional flag found: "
+ + str([os.path.basename(i) for i in flag_files])
+ )
+ self.info(
+ "\nThese conditional files were left in place:\n\n- "
+ + "\n- ".join(self.cleanup_list_conditional)
+ )
# Produce a cleanup script.
no_cleanup_script = []
for cleandir in self.cleanup_list_conditional:
@@ -2071,10 +2467,13 @@ def _cleanup(self, dry_run=False):
clean_script.write("rmdir " + clean_item + "\n")
except Exception as e:
- if no_cleanup_script:
- self.warning('\n\nCould not produce cleanup script for item(s):\n\n- ' + '\n- '.join(no_cleanup_script))
+ if no_cleanup_script:
+ self.warning(
+ "\n\nCould not produce cleanup script for item(s):\n\n- "
+ + "\n- ".join(no_cleanup_script)
+ )
- def _memory_usage(self, pid='self', category="hwm", container=None):
+ def _memory_usage(self, pid="self", category="hwm", container=None):
Memory usage of the process in kilobytes.
@@ -2087,8 +2486,8 @@ def _memory_usage(self, pid='self', category="hwm", container=None):
cmd = "docker stats " + container + " --format '{{.MemUsage}}' --no-stream"
mem_use_str = subprocess.check_output(cmd, shell=True).decode()
- mem_num = re.findall('[\d\.]+', mem_use_str.split("/")[0])[0]
- mem_scale = re.findall('[A-Za-z]+', mem_use_str.split("/")[0])[0]
+ mem_num = re.findall("[\d\.]+", mem_use_str.split("/")[0])[0]
+ mem_scale = re.findall("[A-Za-z]+", mem_use_str.split("/")[0])[0]
mem_num = float(mem_num)
if mem_scale == "GiB":
@@ -2103,13 +2502,13 @@ def _memory_usage(self, pid='self', category="hwm", container=None):
# Thanks Martin Geisler:
status = None
- result = {'peak': 0, 'rss': 0, 'hwm': 0}
+ result = {"peak": 0, "rss": 0, "hwm": 0}
# This will only work on systems with a /proc file system
# (like Linux).
# status = open('/proc/self/status')
- proc_spot = '/proc/%s/status' % pid
+ proc_spot = "/proc/%s/status" % pid
status = open(proc_spot)
for line in status:
parts = line.split()
@@ -2126,13 +2525,17 @@ def _memory_usage(self, pid='self', category="hwm", container=None):
return result[category]
def _triage_error(self, e, nofail):
- """ Print a message and decide what to do about an error. """
+ """Print a message and decide what to do about an error."""
if not nofail:
elif self._failed:
- self.info("This is a nofail process, but the pipeline was terminated for other reasons, so we fail.")
+ self.info(
+ "This is a nofail process, but the pipeline was terminated for other reasons, so we fail."
+ )
raise e
- self.error("ERROR: Subprocess returned nonzero result, but pipeline is continuing because nofail=True")
+ self.error(
+ "ERROR: Subprocess returned nonzero result, but pipeline is continuing because nofail=True"
+ )
# TODO: return nonzero, or something. . .?
diff --git a/pypiper/ngstk.py b/pypiper/ngstk.py
index dcc57e8e..329b321b 100755
--- a/pypiper/ngstk.py
+++ b/pypiper/ngstk.py
@@ -1,11 +1,13 @@
""" Broadly applicable NGS processing/analysis functionality """
+import errno
import os
import re
import subprocess
-import errno
from attmap import AttMapEcho
from yacman import load_yaml
from .exceptions import UnsupportedFiletypeException
from .utils import is_fastq, is_gzipped_fastq, is_sam_or_bam
@@ -43,7 +45,8 @@ def __init__(self, config_file=None, pm=None):
# parse yaml into the project's attributes
# self.add_entries(**config)
super(NGSTk, self).__init__(
- None if config_file is None else load_yaml(config_file))
+ None if config_file is None else load_yaml(config_file)
+ )
# Keep a link to the pipeline manager, if one is provided.
# if None is provided, instantiate "tools" and "parameters" with empty AttMaps
@@ -63,12 +66,15 @@ def __init__(self, config_file=None, pm=None):
self.parameters = AttMapEcho()
# If pigz is available, use that. Otherwise, default to gzip.
- if hasattr(self.pm, "cores") and self.pm.cores > 1 and self.check_command("pigz"):
+ if (
+ hasattr(self.pm, "cores")
+ and self.pm.cores > 1
+ and self.check_command("pigz")
+ ):
self.ziptool_cmd = "pigz -f -p {}".format(self.pm.cores)
self.ziptool_cmd = "gzip -f"
def _ensure_folders(self, *paths):
Ensure that paths to folder(s) exist.
@@ -90,7 +96,6 @@ def _ensure_folders(self, *paths):
# Otherwise, just ensure that we have path to file's folder.
self.make_dir(fpath if ext else p)
def ziptool(self):
@@ -100,7 +105,6 @@ def ziptool(self):
return self.ziptool_cmd
def make_dir(self, path):
Forge path to directory, creating intermediates as needed.
@@ -113,12 +117,10 @@ def make_dir(self, path):
if exception.errno != errno.EEXIST:
def make_sure_path_exists(self, path):
- """ Alias for make_dir """
+ """Alias for make_dir"""
# Borrowed from looper
def check_command(self, command):
@@ -126,7 +128,9 @@ def check_command(self, command):
# Use `command` to see if command is callable, store exit code
- code = os.system("command -v {0} >/dev/null 2>&1 || {{ exit 1; }}".format(command))
+ code = os.system(
+ "command -v {0} >/dev/null 2>&1 || {{ exit 1; }}".format(command)
+ )
# If exit code is not 0, report which command failed and return False, else return True
if code != 0:
@@ -135,7 +139,6 @@ def check_command(self, command):
return True
def get_file_size(self, filenames):
Get size of all files in string (space-separated) in megabytes (Mb).
@@ -149,10 +152,15 @@ def get_file_size(self, filenames):
if type(filenames) is list:
return sum([self.get_file_size(filename) for filename in filenames])
- return round(sum([float(os.stat(f).st_size) for f in filenames.split(" ")]) / (1024 ** 2), 4)
+ return round(
+ sum([float(os.stat(f).st_size) for f in filenames.split(" ")])
+ / (1024**2),
+ 4,
+ )
- def mark_duplicates(self, aligned_file, out_file, metrics_file, remove_duplicates="True"):
+ def mark_duplicates(
+ self, aligned_file, out_file, metrics_file, remove_duplicates="True"
+ ):
cmd = self.tools.java
if self.pm.javamem: # If a memory restriction exists.
cmd += " -Xmx" + self.pm.javamem
@@ -163,9 +171,9 @@ def mark_duplicates(self, aligned_file, out_file, metrics_file, remove_duplicate
cmd += " REMOVE_DUPLICATES=" + remove_duplicates
return cmd
- def bam2fastq(self, input_bam, output_fastq,
- output_fastq2=None, unpaired_fastq=None):
+ def bam2fastq(
+ self, input_bam, output_fastq, output_fastq2=None, unpaired_fastq=None
+ ):
Create command to convert BAM(s) to FASTQ(s).
@@ -185,7 +193,6 @@ def bam2fastq(self, input_bam, output_fastq,
cmd += " UNPAIRED_FASTQ={0}".format(unpaired_fastq)
return cmd
def bam_to_fastq(self, bam_file, out_fastq_pre, paired_end):
Build command to convert BAM file to FASTQ file(s) (R1/R2).
@@ -209,11 +216,10 @@ def bam_to_fastq(self, bam_file, out_fastq_pre, paired_end):
return cmd
def bam_to_fastq_awk(self, bam_file, out_fastq_pre, paired_end, zipmode=False):
- This converts bam file to fastq files, but using awk. As of 2016, this is much faster
- than the standard way of doing this using Picard, and also much faster than the
+ This converts bam file to fastq files, but using awk. As of 2016, this is much faster
+ than the standard way of doing this using Picard, and also much faster than the
bedtools implementation as well; however, it does no sanity checks and assumes the reads
(for paired data) are all paired (no singletons), in the correct order.
:param bool zipmode: Should the output be zipped?
@@ -222,29 +228,27 @@ def bam_to_fastq_awk(self, bam_file, out_fastq_pre, paired_end, zipmode=False):
fq1 = out_fastq_pre + "_R1.fastq"
fq2 = out_fastq_pre + "_R2.fastq"
if zipmode:
fq1 = fq1 + ".gz"
fq2 = fq2 + ".gz"
- fq1_target = " | \"" + self.ziptool + " -c > " + fq1 + '"'
- fq2_target = " | \"" + self.ziptool + " -c > " + fq2 + '"'
+ fq1_target = ' | "' + self.ziptool + " -c > " + fq1 + '"'
+ fq2_target = ' | "' + self.ziptool + " -c > " + fq2 + '"'
fq1_target = ' > "' + fq1 + '"'
fq2_target = ' > "' + fq2 + '"'
if paired_end:
cmd = self.tools.samtools + " view " + bam_file + " | awk '"
- cmd += r'{ if (NR%2==1) print "@"$1"/1\n"$10"\n+\n"$11' + fq1_target + ';'
- cmd += r' else print "@"$1"/2\n"$10"\n+\n"$11' + fq2_target + '; }'
+ cmd += r'{ if (NR%2==1) print "@"$1"/1\n"$10"\n+\n"$11' + fq1_target + ";"
+ cmd += r' else print "@"$1"/2\n"$10"\n+\n"$11' + fq2_target + "; }"
cmd += "'" # end the awk command
fq2 = None
cmd = self.tools.samtools + " view " + bam_file + " | awk '"
- cmd += r'{ print "@"$1"\n"$10"\n+\n"$11' + fq1_target + '; }'
+ cmd += r'{ print "@"$1"\n"$10"\n+\n"$11' + fq1_target + "; }"
cmd += "'"
return cmd, fq1, fq2
def bam_to_fastq_bedtools(self, bam_file, out_fastq_pre, paired_end):
Converts bam to fastq; A version using bedtools
@@ -252,14 +256,20 @@ def bam_to_fastq_bedtools(self, bam_file, out_fastq_pre, paired_end):
fq1 = out_fastq_pre + "_R1.fastq"
fq2 = None
- cmd = self.tools.bedtools + " bamtofastq -i " + bam_file + " -fq " + fq1 + ".fastq"
+ cmd = (
+ self.tools.bedtools
+ + " bamtofastq -i "
+ + bam_file
+ + " -fq "
+ + fq1
+ + ".fastq"
+ )
if paired_end:
fq2 = out_fastq_pre + "_R2.fastq"
cmd += " -fq2 " + fq2
return cmd, fq1, fq2
def get_input_ext(self, input_file):
Get the extension of the input_file. Assumes you're using either
@@ -272,12 +282,13 @@ def get_input_ext(self, input_file):
elif input_file.endswith(".fastq") or input_file.endswith(".fq"):
input_ext = ".fastq"
- errmsg = "'{}'; this pipeline can only deal with .bam, .fastq, " \
- "or .fastq.gz files".format(input_file)
+ errmsg = (
+ "'{}'; this pipeline can only deal with .bam, .fastq, "
+ "or .fastq.gz files".format(input_file)
+ )
raise UnsupportedFiletypeException(errmsg)
return input_ext
def merge_or_link(self, input_args, raw_folder, local_base="sample"):
Standardizes various input possibilities by converting either .bam,
@@ -312,8 +323,7 @@ class of inputs (which can in turn be a string or a list).
local_base_extended = local_base
if input_arg:
- out = self.merge_or_link(
- input_arg, raw_folder, local_base_extended)
+ out = self.merge_or_link(input_arg, raw_folder, local_base_extended)
print("Local input file: '{}'".format(out))
# Make sure file exists:
@@ -343,7 +353,8 @@ class of inputs (which can in turn be a string or a list).
"ln -sf " + input_arg + " " + local_input_abs,
- shell=True)
+ shell=True,
+ )
# return the local (linked) filename absolute path
return local_input_abs
@@ -365,11 +376,11 @@ class of inputs (which can in turn be a string or a list).
if all([self.get_input_ext(x) == ".fastq.gz" for x in input_args]):
sample_merged_gz = local_base + ".merged.fastq.gz"
output_merge_gz = os.path.join(raw_folder, sample_merged_gz)
- #cmd1 = self.ziptool + "-d -c " + " ".join(input_args) + " > " + output_merge
- #cmd2 = self.ziptool + " " + output_merge
- #self.pm.run([cmd1, cmd2], output_merge_gz)
+ # cmd1 = self.ziptool + "-d -c " + " ".join(input_args) + " > " + output_merge
+ # cmd2 = self.ziptool + " " + output_merge
+ # self.pm.run([cmd1, cmd2], output_merge_gz)
# you can save yourself the decompression/recompression:
- cmd = "cat " + " ".join(input_args) + " > " + output_merge_gz
+ cmd = "cat " + " ".join(input_args) + " > " + output_merge_gz
self.pm.run(cmd, output_merge_gz)
return output_merge_gz
@@ -383,13 +394,20 @@ class of inputs (which can in turn be a string or a list).
# At this point, we don't recognize the input file types or they
# do not match.
raise NotImplementedError(
- "Input files must be of the same type, and can only "
- "merge bam or fastq.")
+ "Input files must be of the same type, and can only "
+ "merge bam or fastq."
+ )
def input_to_fastq(
- self, input_file, sample_name, paired_end, fastq_folder,
- output_file=None, multiclass=False, zipmode=False):
+ self,
+ input_file,
+ sample_name,
+ paired_end,
+ fastq_folder,
+ output_file=None,
+ multiclass=False,
+ zipmode=False,
+ ):
Builds a command to convert input file to fastq, for various inputs.
@@ -424,10 +442,15 @@ def input_to_fastq(
output_file = []
for in_i, in_arg in enumerate(input_file):
output = fastq_prefix + "_R" + str(in_i + 1) + ".fastq"
- result_cmd, uf, result_file = \
- self.input_to_fastq(in_arg, sample_name, paired_end,
- fastq_folder, output, multiclass=True,
- zipmode=zipmode)
+ result_cmd, uf, result_file = self.input_to_fastq(
+ in_arg,
+ sample_name,
+ paired_end,
+ fastq_folder,
+ output,
+ multiclass=True,
+ zipmode=zipmode,
+ )
@@ -444,8 +467,10 @@ def input_to_fastq(
if input_ext == ".bam":
print("Found .bam file")
- #cmd = self.bam_to_fastq(input_file, fastq_prefix, paired_end)
- cmd, fq1, fq2 = self.bam_to_fastq_awk(input_file, fastq_prefix, paired_end, zipmode)
+ # cmd = self.bam_to_fastq(input_file, fastq_prefix, paired_end)
+ cmd, fq1, fq2 = self.bam_to_fastq_awk(
+ input_file, fastq_prefix, paired_end, zipmode
+ )
# pm.run(cmd, output_file, follow=check_fastq)
if fq2:
output_file = [fq1, fq2]
@@ -455,20 +480,24 @@ def input_to_fastq(
print("Found .fastq.gz file")
if paired_end and not multiclass:
if zipmode:
- raise NotImplementedError("Can't use zipmode on interleaved fastq data.")
+ raise NotImplementedError(
+ "Can't use zipmode on interleaved fastq data."
+ )
# For paired-end reads in one fastq file, we must split the
# file into 2. The pipeline author will need to include this
- # python script in the scripts directory.
+ # python script in the scripts directory.
# TODO: make this self-contained in pypiper. This is a rare
# use case these days, as fastq files are almost never
# interleaved anymore.
- script_path = os.path.join(
- self.tools.scripts_dir, "fastq_split.py")
+ script_path = os.path.join(self.tools.scripts_dir, "fastq_split.py")
cmd = self.tools.python + " -u " + script_path
cmd += " -i " + input_file
cmd += " -o " + fastq_prefix
# Must also return the set of output files
- output_file = [fastq_prefix + "_R1.fastq", fastq_prefix + "_R2.fastq"]
+ output_file = [
+ fastq_prefix + "_R1.fastq",
+ fastq_prefix + "_R2.fastq",
+ ]
if zipmode:
# we do nothing!
@@ -477,7 +506,9 @@ def input_to_fastq(
# For single-end reads, we just unzip the fastq.gz file.
# or, paired-end reads that were already split.
- cmd = self.ziptool + " -d -c " + input_file + " > " + output_file
+ cmd = (
+ self.ziptool + " -d -c " + input_file + " > " + output_file
+ )
# a non-shell version
# cmd1 = "gunzip --force " + input_file
# cmd2 = "mv " + os.path.splitext(input_file)[0] + " " + output_file
@@ -491,7 +522,6 @@ def input_to_fastq(
return [cmd, fastq_prefix, output_file]
def check_fastq(self, input_files, output_files, paired_end):
Returns a follow sanity-check function to be run after a fastq conversion.
@@ -510,9 +540,9 @@ def check_fastq(self, input_files, output_files, paired_end):
# This is AFTER merge, so if there are multiple files it means the
# files were split into read1/read2; therefore I must divide by number
# of files for final reads.
- def temp_func(input_files=input_files, output_files=output_files,
- paired_end=paired_end):
+ def temp_func(
+ input_files=input_files, output_files=output_files, paired_end=paired_end
+ ):
if type(input_files) != list:
input_files = [input_files]
if type(output_files) != list:
@@ -521,35 +551,45 @@ def temp_func(input_files=input_files, output_files=output_files,
n_input_files = len(list(filter(bool, input_files)))
n_output_files = len(list(filter(bool, output_files)))
- total_reads = sum([int(self.count_reads(input_file, paired_end))
- for input_file in input_files])
+ total_reads = sum(
+ [
+ int(self.count_reads(input_file, paired_end))
+ for input_file in input_files
+ ]
+ )
raw_reads = int(total_reads / n_input_files)
- self.pm.report_result("Raw_reads", str(raw_reads))
+ self.pm.pipestat.report(values={"Raw_reads": str(raw_reads)})
total_fastq_reads = sum(
- [int(self.count_reads(output_file, paired_end))
- for output_file in output_files])
+ [
+ int(self.count_reads(output_file, paired_end))
+ for output_file in output_files
+ ]
+ )
fastq_reads = int(total_fastq_reads / n_output_files)
- self.pm.report_result("Fastq_reads", fastq_reads)
+ self.pm.pipestat.report(values={"Fastq_reads": fastq_reads})
input_ext = self.get_input_ext(input_files[0])
# We can only assess pass filter reads in bam files with flags.
if input_ext == ".bam":
num_failed_filter = sum(
- [int(self.count_fail_reads(f, paired_end))
- for f in input_files])
+ [int(self.count_fail_reads(f, paired_end)) for f in input_files]
+ )
pf_reads = int(raw_reads) - num_failed_filter
- self.pm.report_result("PF_reads", str(pf_reads))
+ self.pm.pipestat.report(values={"PF_reads": str(pf_reads)})
if fastq_reads != int(raw_reads):
- raise Exception("Fastq conversion error? Number of input reads "
- "doesn't number of output reads.")
+ raise Exception(
+ "Fastq conversion error? Number of input reads "
+ "doesn't number of output reads."
+ )
return fastq_reads
return temp_func
- def check_trim(self, trimmed_fastq, paired_end, trimmed_fastq_R2=None, fastqc_folder=None):
+ def check_trim(
+ self, trimmed_fastq, paired_end, trimmed_fastq_R2=None, fastqc_folder=None
+ ):
Build function to evaluate read trimming, and optionally run fastqc.
@@ -567,21 +607,21 @@ def check_trim(self, trimmed_fastq, paired_end, trimmed_fastq_R2=None, fastqc_fo
def temp_func():
print("Evaluating read trimming")
if paired_end and not trimmed_fastq_R2:
print("WARNING: specified paired-end but no R2 file")
n_trim = float(self.count_reads(trimmed_fastq, paired_end))
- self.pm.report_result("Trimmed_reads", int(n_trim))
+ self.pm.pipestat.report(values={"Trimmed_reads": int(n_trim)})
- rr = float(self.pm.get_stat("Raw_reads"))
+ rr = float(self.pm.pipestat.retrieve("Raw_reads"))
print("Can't calculate trim loss rate without raw read result.")
- "Trim_loss_rate", round((rr - n_trim) * 100 / rr, 2))
+ "Trim_loss_rate", round((rr - n_trim) * 100 / rr, 2)
+ )
# Also run a fastqc (if installed/requested)
if fastqc_folder:
@@ -591,18 +631,31 @@ def temp_func():
self.pm.run(cmd, lock_name="trimmed_fastqc", nofail=True)
fname, ext = os.path.splitext(os.path.basename(trimmed_fastq))
fastqc_html = os.path.join(fastqc_folder, fname + "_fastqc.html")
- self.pm.report_object("FastQC report r1", fastqc_html)
+ self.pm.pipestat.report(
+ values={
+ "FastQC_report_R1": {
+ "path": fastqc_html,
+ "title": "FastQC report R1",
+ }
+ }
+ )
if paired_end and trimmed_fastq_R2:
cmd = self.fastqc(trimmed_fastq_R2, fastqc_folder)
self.pm.run(cmd, lock_name="trimmed_fastqc_R2", nofail=True)
fname, ext = os.path.splitext(os.path.basename(trimmed_fastq_R2))
fastqc_html = os.path.join(fastqc_folder, fname + "_fastqc.html")
- self.pm.report_object("FastQC report r2", fastqc_html)
+ self.pm.pipestat.report(
+ values={
+ "FastQC_report_R2": {
+ "path": fastqc_html,
+ "title": "FastQC report R2",
+ }
+ }
+ )
return temp_func
def validate_bam(self, input_bam):
Wrapper for Picard's ValidateSamFile.
@@ -615,7 +668,6 @@ def validate_bam(self, input_bam):
cmd += " INPUT=" + input_bam
return cmd
def merge_bams(self, input_bams, merged_bam, in_sorted="TRUE", tmp_dir=None):
Combine multiple files into one.
@@ -653,27 +705,25 @@ def merge_bams(self, input_bams, merged_bam, in_sorted="TRUE", tmp_dir=None):
cmd += " TMP_DIR=" + tmp_dir
return cmd
def merge_bams_samtools(self, input_bams, merged_bam):
- cmd = self.tools.samtools + " merge -f "
+ cmd = self.tools.samtools + " merge -f "
cmd += " -@ " + str(self.pm.cores)
- cmd += " " + merged_bam + " "
+ cmd += " " + merged_bam + " "
cmd += " ".join(input_bams)
return cmd
def merge_fastq(self, inputs, output, run=False, remove_inputs=False):
Merge FASTQ files (zipped or not) into one.
:param Iterable[str] inputs: Collection of paths to files to merge.
:param str output: Path to single output file.
:param bool run: Whether to run the command.
:param bool remove_inputs: Whether to keep the original files.
- :return NoneType | str: Null if running the command, otherwise the
+ :return NoneType | str: Null if running the command, otherwise the
command itself
- :raise ValueError: Raise ValueError if the call is such that
+ :raise ValueError: Raise ValueError if the call is such that
inputs are to be deleted but command is not run.
if remove_inputs and not run:
@@ -687,14 +737,16 @@ def merge_fastq(self, inputs, output, run=False, remove_inputs=False):
return cmd
def count_lines(self, file_name):
Uses the command-line utility wc to count the number of lines in a file. For MacOS, must strip leading whitespace from wc.
:param str file_name: name of file whose lines are to be counted
- x = subprocess.check_output("wc -l " + file_name + " | sed -E 's/^[[:space:]]+//' | cut -f1 -d' '", shell=True)
+ x = subprocess.check_output(
+ "wc -l " + file_name + " | sed -E 's/^[[:space:]]+//' | cut -f1 -d' '",
+ shell=True,
+ )
return x.decode().strip()
def count_lines_zip(self, file_name):
@@ -703,7 +755,13 @@ def count_lines_zip(self, file_name):
For compressed files.
:param file: file_name
- x = subprocess.check_output(self.ziptool + " -d -c " + file_name + " | wc -l | sed -E 's/^[[:space:]]+//' | cut -f1 -d' '", shell=True)
+ x = subprocess.check_output(
+ self.ziptool
+ + " -d -c "
+ + file_name
+ + " | wc -l | sed -E 's/^[[:space:]]+//' | cut -f1 -d' '",
+ shell=True,
+ )
return x.decode().strip()
def get_chrs_from_bam(self, file_name):
@@ -711,7 +769,13 @@ def get_chrs_from_bam(self, file_name):
Uses samtools to grab the chromosomes from the header that are contained
in this bam file.
- x = subprocess.check_output(self.tools.samtools + " view -H " + file_name + " | grep '^@SQ' | cut -f2| sed s'/SN://'", shell=True)
+ x = subprocess.check_output(
+ self.tools.samtools
+ + " view -H "
+ + file_name
+ + " | grep '^@SQ' | cut -f2| sed s'/SN://'",
+ shell=True,
+ )
# Chromosomes will be separated by newlines; split into list to return
return x.decode().split()
@@ -735,14 +799,25 @@ def count_unique_reads(self, file_name, paired_end):
if file_name.endswith("bam"):
param = ""
if paired_end:
- r1 = self.samtools_view(file_name, param=param + " -f64", postpend=" | cut -f1 | sort -k1,1 -u | wc -l | sed -E 's/^[[:space:]]+//'")
- r2 = self.samtools_view(file_name, param=param + " -f128", postpend=" | cut -f1 | sort -k1,1 -u | wc -l | sed -E 's/^[[:space:]]+//'")
+ r1 = self.samtools_view(
+ file_name,
+ param=param + " -f64",
+ postpend=" | cut -f1 | sort -k1,1 -u | wc -l | sed -E 's/^[[:space:]]+//'",
+ )
+ r2 = self.samtools_view(
+ file_name,
+ param=param + " -f128",
+ postpend=" | cut -f1 | sort -k1,1 -u | wc -l | sed -E 's/^[[:space:]]+//'",
+ )
- r1 = self.samtools_view(file_name, param=param + "", postpend=" | cut -f1 | sort -k1,1 -u | wc -l | sed -E 's/^[[:space:]]+//'")
+ r1 = self.samtools_view(
+ file_name,
+ param=param + "",
+ postpend=" | cut -f1 | sort -k1,1 -u | wc -l | sed -E 's/^[[:space:]]+//'",
+ )
r2 = 0
return int(r1) + int(r2)
def count_unique_mapped_reads(self, file_name, paired_end):
For a bam or sam file with paired or or single-end reads, returns the
@@ -759,21 +834,32 @@ def count_unique_mapped_reads(self, file_name, paired_end):
if ext == ".sam":
param = "-S -F4"
- elif ext == "bam":
+ elif ext == ".bam":
param = "-F4"
raise ValueError("Not a SAM or BAM: '{}'".format(file_name))
- if paired_end:
- r1 = self.samtools_view(file_name, param=param + " -f64", postpend=" | cut -f1 | sort -k1,1 -u | wc -l | sed -E 's/^[[:space:]]+//'")
- r2 = self.samtools_view(file_name, param=param + " -f128", postpend=" | cut -f1 | sort -k1,1 -u | wc -l | sed -E 's/^[[:space:]]+//'")
+ if paired_end:
+ r1 = self.samtools_view(
+ file_name,
+ param=param + " -f64",
+ postpend=" | cut -f1 | sort -k1,1 -u | wc -l | sed -E 's/^[[:space:]]+//'",
+ )
+ r2 = self.samtools_view(
+ file_name,
+ param=param + " -f128",
+ postpend=" | cut -f1 | sort -k1,1 -u | wc -l | sed -E 's/^[[:space:]]+//'",
+ )
- r1 = self.samtools_view(file_name, param=param + "", postpend=" | cut -f1 | sort -k1,1 -u | wc -l | sed -E 's/^[[:space:]]+//'")
+ r1 = self.samtools_view(
+ file_name,
+ param=param + "",
+ postpend=" | cut -f1 | sort -k1,1 -u | wc -l | sed -E 's/^[[:space:]]+//'",
+ )
r2 = 0
return int(r1) + int(r2)
def count_flag_reads(self, file_name, flag, paired_end):
Counts the number of reads with the specified flag.
@@ -791,7 +877,6 @@ def count_flag_reads(self, file_name, flag, paired_end):
param += " -S"
return self.samtools_view(file_name, param=param)
def count_multimapping_reads(self, file_name, paired_end):
Counts the number of reads that mapped to multiple locations. Warning:
@@ -807,7 +892,6 @@ def count_multimapping_reads(self, file_name, paired_end):
return int(self.count_flag_reads(file_name, 256, paired_end))
def count_uniquelymapping_reads(self, file_name, paired_end):
Counts the number of reads that mapped to a unique position.
@@ -820,7 +904,6 @@ def count_uniquelymapping_reads(self, file_name, paired_end):
param += " -S"
return self.samtools_view(file_name, param=param)
def count_fail_reads(self, file_name, paired_end):
Counts the number of reads that failed platform/vendor quality checks.
@@ -831,7 +914,6 @@ def count_fail_reads(self, file_name, paired_end):
return int(self.count_flag_reads(file_name, 512, paired_end))
def samtools_view(self, file_name, param, postpend=""):
Run samtools view, with flexible parameters and post-processing.
@@ -843,13 +925,11 @@ def samtools_view(self, file_name, param, postpend=""):
:param str postpend: String to append to the samtools command;
useful to add cut, sort, wc operations to the samtools view output.
- cmd = "{} view {} {} {}".format(
- self.tools.samtools, param, file_name, postpend)
+ cmd = "{} view {} {} {}".format(self.tools.samtools, param, file_name, postpend)
# in python 3, check_output returns a byte string which causes issues.
# with python 3.6 we could use argument: "encoding='UTF-8'""
return subprocess.check_output(cmd, shell=True).decode().strip()
def count_reads(self, file_name, paired_end):
Count reads in a file.
@@ -874,13 +954,14 @@ def count_reads(self, file_name, paired_end):
param_text = "-c" if ext == ".bam" else "-c -S"
return self.samtools_view(file_name, param=param_text)
- num_lines = self.count_lines_zip(file_name) \
- if is_gzipped_fastq(file_name) \
- else self.count_lines(file_name)
+ num_lines = (
+ self.count_lines_zip(file_name)
+ if is_gzipped_fastq(file_name)
+ else self.count_lines(file_name)
+ )
divisor = 2 if paired_end else 4
return int(num_lines) / divisor
def count_concordant(self, aligned_bam):
Count only reads that "aligned concordantly exactly 1 time."
@@ -889,9 +970,8 @@ def count_concordant(self, aligned_bam):
cmd = self.tools.samtools + " view " + aligned_bam + " | "
cmd += "grep 'YT:Z:CP'" + " | uniq -u | wc -l | sed -E 's/^[[:space:]]+//'"
- return subprocess.check_output(cmd, shell=True).decode().strip()
+ return subprocess.check_output(cmd, shell=True).decode().strip()
def count_mapped_reads(self, file_name, paired_end):
@@ -912,35 +992,84 @@ def count_mapped_reads(self, file_name, paired_end):
return self.samtools_view(file_name, param="-c -F4 -S")
return -1
def sam_conversions(self, sam_file, depth=True):
Convert sam files to bam files, then sort and index them for later use.
:param bool depth: also calculate coverage over each position
- cmd = self.tools.samtools + " view -bS " + sam_file + " > " + sam_file.replace(".sam", ".bam") + "\n"
- cmd += self.tools.samtools + " sort " + sam_file.replace(".sam", ".bam") + " -o " + sam_file.replace(".sam", "_sorted.bam") + "\n"
- cmd += self.tools.samtools + " index " + sam_file.replace(".sam", "_sorted.bam") + "\n"
+ cmd = (
+ self.tools.samtools
+ + " view -bS "
+ + sam_file
+ + " > "
+ + sam_file.replace(".sam", ".bam")
+ + "\n"
+ )
+ cmd += (
+ self.tools.samtools
+ + " sort "
+ + sam_file.replace(".sam", ".bam")
+ + " -o "
+ + sam_file.replace(".sam", "_sorted.bam")
+ + "\n"
+ )
+ cmd += (
+ self.tools.samtools
+ + " index "
+ + sam_file.replace(".sam", "_sorted.bam")
+ + "\n"
+ )
if depth:
- cmd += self.tools.samtools + " depth " + sam_file.replace(".sam", "_sorted.bam") + " > " + sam_file.replace(".sam", "_sorted.depth") + "\n"
+ cmd += (
+ self.tools.samtools
+ + " depth "
+ + sam_file.replace(".sam", "_sorted.bam")
+ + " > "
+ + sam_file.replace(".sam", "_sorted.depth")
+ + "\n"
+ )
return cmd
def bam_conversions(self, bam_file, depth=True):
Sort and index bam files for later use.
:param bool depth: also calculate coverage over each position
- cmd = self.tools.samtools + " view -h " + bam_file + " > " + bam_file.replace(".bam", ".sam") + "\n"
- cmd += self.tools.samtools + " sort " + bam_file + " -o " + bam_file.replace(".bam", "_sorted.bam") + "\n"
- cmd += self.tools.samtools + " index " + bam_file.replace(".bam", "_sorted.bam") + "\n"
+ cmd = (
+ self.tools.samtools
+ + " view -h "
+ + bam_file
+ + " > "
+ + bam_file.replace(".bam", ".sam")
+ + "\n"
+ )
+ cmd += (
+ self.tools.samtools
+ + " sort "
+ + bam_file
+ + " -o "
+ + bam_file.replace(".bam", "_sorted.bam")
+ + "\n"
+ )
+ cmd += (
+ self.tools.samtools
+ + " index "
+ + bam_file.replace(".bam", "_sorted.bam")
+ + "\n"
+ )
if depth:
- cmd += self.tools.samtools + " depth " + bam_file.replace(".bam", "_sorted.bam") + " > " + bam_file.replace(".bam", "_sorted.depth") + "\n"
+ cmd += (
+ self.tools.samtools
+ + " depth "
+ + bam_file.replace(".bam", "_sorted.bam")
+ + " > "
+ + bam_file.replace(".bam", "_sorted.depth")
+ + "\n"
+ )
return cmd
def fastqc(self, file, output_dir):
Create command to run fastqc on a FASTQ file
@@ -959,9 +1088,9 @@ def fastqc(self, file, output_dir):
if not os.path.isabs(output_dir) and pm is not None:
output_dir = os.path.join(pm.outfolder, output_dir)
- return "{} --noextract --outdir {} {}".\
- format(self.tools.fastqc, output_dir, file)
+ return "{} --noextract --outdir {} {}".format(
+ self.tools.fastqc, output_dir, file
+ )
def fastqc_rename(self, input_bam, output_dir, sample_name):
@@ -984,20 +1113,29 @@ def fastqc_rename(self, input_bam, output_dir, sample_name):
cmd1 = self.fastqc(input_bam, output_dir)
cmd2 = "if [[ ! -s {1}_fastqc.html ]]; then mv {0}_fastqc.html {1}_fastqc.html; mv {0}_fastqc.zip {1}_fastqc.zip; fi".format(
- os.path.join(output_dir, initial), os.path.join(output_dir, sample_name))
+ os.path.join(output_dir, initial), os.path.join(output_dir, sample_name)
+ )
return cmds
def samtools_index(self, bam_file):
"""Index a bam file."""
cmd = self.tools.samtools + " index {0}".format(bam_file)
return cmd
def slurm_header(
- self, job_name, output, queue="shortq", n_tasks=1, time="10:00:00",
- cpus_per_task=8, mem_per_cpu=2000, nodes=1, user_mail="", mail_type="end"):
+ self,
+ job_name,
+ output,
+ queue="shortq",
+ n_tasks=1,
+ time="10:00:00",
+ cpus_per_task=8,
+ mem_per_cpu=2000,
+ nodes=1,
+ user_mail="",
+ mail_type="end",
+ ):
cmd = """ #!/bin/bash
#SBATCH --partition={0}
#SBATCH --ntasks={1}
@@ -1018,51 +1156,65 @@ def slurm_header(
- queue, n_tasks, time, cpus_per_task, mem_per_cpu,
- nodes, job_name, output, mail_type, user_mail)
+ queue,
+ n_tasks,
+ time,
+ cpus_per_task,
+ mem_per_cpu,
+ nodes,
+ job_name,
+ output,
+ mail_type,
+ user_mail,
+ )
return cmd
def slurm_footer(self):
return " date"
def slurm_submit_job(self, job_file):
return os.system("sbatch %s" % job_file)
def remove_file(self, file_name):
return "rm {0}".format(file_name)
def move_file(self, old, new):
return "mv {0} {1}".format(old, new)
def preseq_curve(self, bam_file, output_prefix):
return """
preseq c_curve -B -P -o {0}.yield.txt {1}
- """.format(output_prefix, bam_file)
+ """.format(
+ output_prefix, bam_file
+ )
def preseq_extrapolate(self, bam_file, output_prefix):
return """
preseq lc_extrap -v -B -P -e 1e+9 -o {0}.future_yield.txt {1}
- """.format(output_prefix, bam_file)
+ """.format(
+ output_prefix, bam_file
+ )
def preseq_coverage(self, bam_file, output_prefix):
return """
preseq gc_extrap -o {0}.future_coverage.txt {1}
- """.format(output_prefix, bam_file)
+ """.format(
+ output_prefix, bam_file
+ )
def trimmomatic(
- self, input_fastq1, output_fastq1, cpus, adapters, log,
- input_fastq2=None, output_fastq1_unpaired=None,
- output_fastq2=None, output_fastq2_unpaired=None):
+ self,
+ input_fastq1,
+ output_fastq1,
+ cpus,
+ adapters,
+ log,
+ input_fastq2=None,
+ output_fastq1_unpaired=None,
+ output_fastq2=None,
+ output_fastq2_unpaired=None,
+ ):
PE = False if input_fastq2 is None else True
pe = "PE" if PE else "SE"
cmd = self.tools.java + " -Xmx" + self.pm.javamem
@@ -1072,17 +1224,26 @@ def trimmomatic(
cmd += " {0}".format(input_fastq2)
cmd += " {0}".format(output_fastq1)
if PE:
- cmd += " {0} {1} {2}".format(output_fastq1_unpaired, output_fastq2, output_fastq2_unpaired)
+ cmd += " {0} {1} {2}".format(
+ output_fastq1_unpaired, output_fastq2, output_fastq2_unpaired
+ )
cmd += " ILLUMINACLIP:{0}:1:40:15:8:true".format(adapters)
cmd += " LEADING:3 TRAILING:3"
cmd += " SLIDINGWINDOW:4:10"
cmd += " MINLEN:36"
return cmd
def skewer(
- self, input_fastq1, output_prefix, output_fastq1,
- log, cpus, adapters, input_fastq2=None, output_fastq2=None):
+ self,
+ input_fastq1,
+ output_prefix,
+ output_fastq1,
+ log,
+ cpus,
+ adapters,
+ input_fastq2=None,
+ output_fastq2=None,
+ ):
Create commands with which to run skewer.
@@ -1117,17 +1278,33 @@ def skewer(
cmd2 = "mv {0} {1}".format(output_prefix + "-trimmed.fastq", output_fastq1)
- cmd2 = "mv {0} {1}".format(output_prefix + "-trimmed-pair1.fastq", output_fastq1)
+ cmd2 = "mv {0} {1}".format(
+ output_prefix + "-trimmed-pair1.fastq", output_fastq1
+ )
- cmd3 = "mv {0} {1}".format(output_prefix + "-trimmed-pair2.fastq", output_fastq2)
+ cmd3 = "mv {0} {1}".format(
+ output_prefix + "-trimmed-pair2.fastq", output_fastq2
+ )
cmd4 = "mv {0} {1}".format(output_prefix + "-trimmed.log", log)
return cmds
- def bowtie2_map(self, input_fastq1, output_bam, log, metrics, genome_index, max_insert, cpus, input_fastq2=None):
+ def bowtie2_map(
+ self,
+ input_fastq1,
+ output_bam,
+ log,
+ metrics,
+ genome_index,
+ max_insert,
+ cpus,
+ input_fastq2=None,
+ ):
# Admits 2000bp-long fragments (--maxins option)
- cmd = self.tools.bowtie2 + " --very-sensitive --no-discordant -p {0}".format(cpus)
+ cmd = self.tools.bowtie2 + " --very-sensitive --no-discordant -p {0}".format(
+ cpus
+ )
cmd += " -x {0}".format(genome_index)
cmd += " --met-file {0}".format(metrics)
if input_fastq2 is None:
@@ -1136,15 +1313,24 @@ def bowtie2_map(self, input_fastq1, output_bam, log, metrics, genome_index, max_
cmd += " --maxins {0}".format(max_insert)
cmd += " -1 {0}".format(input_fastq1)
cmd += " -2 {0}".format(input_fastq2)
- cmd += " 2> {0} | samtools view -S -b - | samtools sort -o {1} -".format(log, output_bam)
+ cmd += " 2> {0} | samtools view -S -b - | samtools sort -o {1} -".format(
+ log, output_bam
+ )
return cmd
def topHat_map(self, input_fastq, output_dir, genome, transcriptome, cpus):
# Allow paired input
- cmd = self.tools.tophat + " --GTF {0} --b2-L 15 --library-type fr-unstranded --mate-inner-dist 120".format(transcriptome)
+ cmd = (
+ self.tools.tophat
+ + " --GTF {0} --b2-L 15 --library-type fr-unstranded --mate-inner-dist 120".format(
+ transcriptome
+ )
+ )
cmd += " --max-multihits 100 --no-coverage-search"
- cmd += " --num-threads {0} --output-dir {1} {2} {3}".format(cpus, output_dir, genome, input_fastq)
+ cmd += " --num-threads {0} --output-dir {1} {2} {3}".format(
+ cpus, output_dir, genome, input_fastq
+ )
return cmd
def picard_mark_duplicates(self, input_bam, output_bam, metrics_file, temp_dir="."):
@@ -1164,33 +1350,50 @@ def picard_mark_duplicates(self, input_bam, output_bam, metrics_file, temp_dir="
return [cmd1, cmd2, cmd3]
def sambamba_remove_duplicates(self, input_bam, output_bam, cpus=16):
- cmd = self.tools.sambamba + " markdup -t {0} -r {1} {2}".format(cpus, input_bam, output_bam)
+ cmd = self.tools.sambamba + " markdup -t {0} -r {1} {2}".format(
+ cpus, input_bam, output_bam
+ )
return cmd
def get_mitochondrial_reads(self, bam_file, output, cpus=4):
- """
- """
+ """ """
tmp_bam = bam_file + "tmp_rmMe"
cmd1 = self.tools.sambamba + " index -t {0} {1}".format(cpus, bam_file)
- cmd2 = self.tools.sambamba + " slice {0} chrM | {1} markdup -t 4 /dev/stdin {2} 2> {3}".format(bam_file, self.tools.sambamba, tmp_bam, output)
+ cmd2 = (
+ self.tools.sambamba
+ + " slice {0} chrM | {1} markdup -t 4 /dev/stdin {2} 2> {3}".format(
+ bam_file, self.tools.sambamba, tmp_bam, output
+ )
+ )
cmd3 = "rm {}".format(tmp_bam)
return [cmd1, cmd2, cmd3]
- def filter_reads(self, input_bam, output_bam, metrics_file, paired=False, cpus=16, Q=30):
+ def filter_reads(
+ self, input_bam, output_bam, metrics_file, paired=False, cpus=16, Q=30
+ ):
Remove duplicates, filter for >Q, remove multiple mapping reads.
For paired-end reads, keep only proper pairs.
nodups = re.sub("\.bam$", "", output_bam) + ".nodups.nofilter.bam"
- cmd1 = self.tools.sambamba + " markdup -t {0} -r --compression-level=0 {1} {2} 2> {3}".format(cpus, input_bam, nodups, metrics_file)
- cmd2 = self.tools.sambamba + ' view -t {0} -f bam --valid'.format(cpus)
+ cmd1 = (
+ self.tools.sambamba
+ + " markdup -t {0} -r --compression-level=0 {1} {2} 2> {3}".format(
+ cpus, input_bam, nodups, metrics_file
+ )
+ )
+ cmd2 = self.tools.sambamba + " view -t {0} -f bam --valid".format(cpus)
if paired:
cmd2 += ' -F "not (unmapped or mate_is_unmapped) and proper_pair'
cmd2 += ' -F "not unmapped'
- cmd2 += ' and not (secondary_alignment or supplementary) and mapping_quality >= {0}"'.format(Q)
- cmd2 += ' {0} |'.format(nodups)
- cmd2 += self.tools.sambamba + " sort -t {0} /dev/stdin -o {1}".format(cpus, output_bam)
+ cmd2 += ' and not (secondary_alignment or supplementary) and mapping_quality >= {0}"'.format(
+ Q
+ )
+ cmd2 += " {0} |".format(nodups)
+ cmd2 += self.tools.sambamba + " sort -t {0} /dev/stdin -o {1}".format(
+ cpus, output_bam
+ )
cmd3 = "if [[ -s {0} ]]; then rm {0}; fi".format(nodups)
cmd4 = "if [[ -s {0} ]]; then rm {0}; fi".format(nodups + ".bai")
return [cmd1, cmd2, cmd3, cmd4]
@@ -1203,7 +1406,6 @@ def shift_reads(self, input_bam, genome, output_bam):
cmd += " " + self.tools.samtools + " sort -o {0} -".format(output_bam)
return cmd
def sort_index_bam(self, input_bam, output_bam):
tmp_bam = re.sub("\.bam", ".sorted", input_bam)
cmd1 = self.tools.samtools + " sort {0} {1}".format(input_bam, tmp_bam)
@@ -1211,12 +1413,10 @@ def sort_index_bam(self, input_bam, output_bam):
cmd3 = self.tools.samtools + " index {0}".format(output_bam)
return [cmd1, cmd2, cmd3]
def index_bam(self, input_bam):
cmd = self.tools.samtools + " index {0}".format(input_bam)
return cmd
def run_spp(self, input_bam, output, plot, cpus):
Run the SPP read peak analysis tool.
@@ -1229,38 +1429,40 @@ def run_spp(self, input_bam, output, plot, cpus):
base = "{} {} -rf -savp".format(self.tools.Rscript, self.tools.spp)
cmd = base + " -savp={} -s=0:5:500 -c={} -out={} -p={}".format(
- plot, input_bam, output, cpus)
+ plot, input_bam, output, cpus
+ )
return cmd
def get_fragment_sizes(self, bam_file):
- import pysam
import numpy as np
+ import pysam
frag_sizes = list()
- bam = pysam.Samfile(bam_file, 'rb')
+ bam = pysam.Samfile(bam_file, "rb")
for read in bam:
if bam.getrname(read.tid) != "chrM" and read.tlen < 1500:
return np.array(frag_sizes)
- def plot_atacseq_insert_sizes(self, bam, plot, output_csv, max_insert=1500, smallest_insert=30):
+ def plot_atacseq_insert_sizes(
+ self, bam, plot, output_csv, max_insert=1500, smallest_insert=30
+ ):
Heavy inspiration from here:
- import pysam
- import numpy as np
+ import matplotlib
import matplotlib.mlab as mlab
- from scipy.optimize import curve_fit
+ import numpy as np
+ import pysam
from scipy.integrate import simps
- import matplotlib
- matplotlib.use('Agg')
+ from scipy.optimize import curve_fit
+ matplotlib.use("Agg")
import matplotlib.pyplot as plt
print("Necessary Python modules couldn't be loaded.")
@@ -1268,6 +1470,7 @@ def plot_atacseq_insert_sizes(self, bam, plot, output_csv, max_insert=1500, smal
import seaborn as sns
@@ -1275,7 +1478,7 @@ def plot_atacseq_insert_sizes(self, bam, plot, output_csv, max_insert=1500, smal
def get_fragment_sizes(bam, max_insert=1500):
frag_sizes = list()
- bam = pysam.Samfile(bam, 'rb')
+ bam = pysam.Samfile(bam, "rb")
for i, read in enumerate(bam):
if read.tlen < max_insert:
@@ -1293,11 +1496,13 @@ def mixture_function(x, *p):
nfr = expo(x, 2.9e-02, 2.8e-02)
nfr[:smallest_insert] = 0
- return (mlab.normpdf(x, m1, s1) * w1 +
- mlab.normpdf(x, m2, s2) * w2 +
- mlab.normpdf(x, m3, s3) * w3 +
- mlab.normpdf(x, m4, s4) * w4 +
- nfr)
+ return (
+ mlab.normpdf(x, m1, s1) * w1
+ + mlab.normpdf(x, m2, s2) * w2
+ + mlab.normpdf(x, m3, s3) * w3
+ + mlab.normpdf(x, m4, s4) * w4
+ + nfr
+ )
def expo(x, q, r):
@@ -1316,17 +1521,30 @@ def expo(x, q, r):
# Parameters are empirical, need to check
paramGuess = [
- 200, 50, 0.7, # gaussians
- 400, 50, 0.15,
- 600, 50, 0.1,
- 800, 55, 0.045,
- 2.9e-02, 2.8e-02 # exponential
+ 200,
+ 50,
+ 0.7, # gaussians
+ 400,
+ 50,
+ 0.15,
+ 600,
+ 50,
+ 0.1,
+ 800,
+ 55,
+ 0.045,
+ 2.9e-02,
+ 2.8e-02, # exponential
popt3, pcov3 = curve_fit(
- mixture_function, x[smallest_insert:], y[smallest_insert:],
- p0=paramGuess, maxfev=100000)
+ mixture_function,
+ x[smallest_insert:],
+ y[smallest_insert:],
+ p0=paramGuess,
+ maxfev=100000,
+ )
print("Nucleosomal fit could not be found.")
@@ -1340,19 +1558,19 @@ def expo(x, q, r):
plt.hist(frag_sizes, numBins, histtype="step", ec="k", normed=1, alpha=0.5)
# Plot nucleosomal fits
- plt.plot(x, mlab.normpdf(x, m1, s1) * w1, 'r-', lw=1.5, label="1st nucleosome")
- plt.plot(x, mlab.normpdf(x, m2, s2) * w2, 'g-', lw=1.5, label="2nd nucleosome")
- plt.plot(x, mlab.normpdf(x, m3, s3) * w3, 'b-', lw=1.5, label="3rd nucleosome")
- plt.plot(x, mlab.normpdf(x, m4, s4) * w4, 'c-', lw=1.5, label="4th nucleosome")
+ plt.plot(x, mlab.normpdf(x, m1, s1) * w1, "r-", lw=1.5, label="1st nucleosome")
+ plt.plot(x, mlab.normpdf(x, m2, s2) * w2, "g-", lw=1.5, label="2nd nucleosome")
+ plt.plot(x, mlab.normpdf(x, m3, s3) * w3, "b-", lw=1.5, label="3rd nucleosome")
+ plt.plot(x, mlab.normpdf(x, m4, s4) * w4, "c-", lw=1.5, label="4th nucleosome")
# Plot nucleosome-free fit
nfr = expo(x, 2.9e-02, 2.8e-02)
nfr[:smallest_insert] = 0
- plt.plot(x, nfr, 'k-', lw=1.5, label="nucleosome-free")
+ plt.plot(x, nfr, "k-", lw=1.5, label="nucleosome-free")
# Plot sum of fits
ys = mixture_function(x, *popt3)
- plt.plot(x, ys, 'k--', lw=3.5, label="fit sum")
+ plt.plot(x, ys, "k--", lw=3.5, label="fit sum")
plt.xlabel("Fragment size (bp)")
@@ -1363,10 +1581,26 @@ def expo(x, q, r):
areas = [
["fraction", "area under curve", "max density"],
["Nucleosome-free fragments", simps(nfr), max(nfr)],
- ["1st nucleosome", simps(mlab.normpdf(x, m1, s1) * w1), max(mlab.normpdf(x, m1, s1) * w1)],
- ["2nd nucleosome", simps(mlab.normpdf(x, m2, s2) * w1), max(mlab.normpdf(x, m2, s2) * w2)],
- ["3rd nucleosome", simps(mlab.normpdf(x, m3, s3) * w1), max(mlab.normpdf(x, m3, s3) * w3)],
- ["4th nucleosome", simps(mlab.normpdf(x, m4, s4) * w1), max(mlab.normpdf(x, m4, s4) * w4)]
+ [
+ "1st nucleosome",
+ simps(mlab.normpdf(x, m1, s1) * w1),
+ max(mlab.normpdf(x, m1, s1) * w1),
+ ],
+ [
+ "2nd nucleosome",
+ simps(mlab.normpdf(x, m2, s2) * w1),
+ max(mlab.normpdf(x, m2, s2) * w2),
+ ],
+ [
+ "3rd nucleosome",
+ simps(mlab.normpdf(x, m3, s3) * w1),
+ max(mlab.normpdf(x, m3, s3) * w3),
+ ],
+ [
+ "4th nucleosome",
+ simps(mlab.normpdf(x, m4, s4) * w1),
+ max(mlab.normpdf(x, m4, s4) * w4),
+ ],
@@ -1380,8 +1614,15 @@ def expo(x, q, r):
# TODO: parameterize in terms of normalization factor.
def bam_to_bigwig(
- self, input_bam, output_bigwig, genome_sizes, genome,
- tagmented=False, normalize=False, norm_factor=1000):
+ self,
+ input_bam,
+ output_bigwig,
+ genome_sizes,
+ genome,
+ tagmented=False,
+ normalize=False,
+ norm_factor=1000,
+ ):
Convert a BAM file to a bigWig file.
@@ -1401,34 +1642,63 @@ def bam_to_bigwig(
transient_file = os.path.abspath(re.sub("\.bigWig", "", output_bigwig))
cmd1 = self.tools.bedtools + " bamtobed -i {0} |".format(input_bam)
if not tagmented:
- cmd1 += " " + self.tools.bedtools + " slop -i stdin -g {0} -s -l 0 -r 130 |".format(genome_sizes)
+ cmd1 += (
+ " "
+ + self.tools.bedtools
+ + " slop -i stdin -g {0} -s -l 0 -r 130 |".format(genome_sizes)
+ )
cmd1 += " fix_bedfile_genome_boundaries.py {0} |".format(genome)
- cmd1 += " " + self.tools.genomeCoverageBed + " {0}-bg -g {1} -i stdin > {2}.cov".format(
- "-5 " if tagmented else "",
- genome_sizes,
- transient_file
+ cmd1 += (
+ " "
+ + self.tools.genomeCoverageBed
+ + " {0}-bg -g {1} -i stdin > {2}.cov".format(
+ "-5 " if tagmented else "", genome_sizes, transient_file
+ )
if normalize:
- cmds.append("""awk 'NR==FNR{{sum+= $4; next}}{{ $4 = ($4 / sum) * {1}; print}}' {0}.cov {0}.cov | sort -k1,1 -k2,2n > {0}.normalized.cov""".format(transient_file, norm_factor))
- cmds.append(self.tools.bedGraphToBigWig + " {0}{1}.cov {2} {3}".format(transient_file, ".normalized" if normalize else "", genome_sizes, output_bigwig))
+ cmds.append(
+ """awk 'NR==FNR{{sum+= $4; next}}{{ $4 = ($4 / sum) * {1}; print}}' {0}.cov {0}.cov | sort -k1,1 -k2,2n > {0}.normalized.cov""".format(
+ transient_file, norm_factor
+ )
+ )
+ cmds.append(
+ self.tools.bedGraphToBigWig
+ + " {0}{1}.cov {2} {3}".format(
+ transient_file,
+ ".normalized" if normalize else "",
+ genome_sizes,
+ output_bigwig,
+ )
+ )
# remove tmp files
cmds.append("if [[ -s {0}.cov ]]; then rm {0}.cov; fi".format(transient_file))
if normalize:
- cmds.append("if [[ -s {0}.normalized.cov ]]; then rm {0}.normalized.cov; fi".format(transient_file))
+ cmds.append(
+ "if [[ -s {0}.normalized.cov ]]; then rm {0}.normalized.cov; fi".format(
+ transient_file
+ )
+ )
cmds.append("chmod 755 {0}".format(output_bigwig))
return cmds
- def add_track_to_hub(self, sample_name, track_url, track_hub, colour, five_prime=""):
- cmd1 = """echo "track type=bigWig name='{0} {1}' description='{0} {1}'""".format(sample_name, five_prime)
- cmd1 += """ height=32 visibility=full maxHeightPixels=32:32:25 bigDataUrl={0} color={1}" >> {2}""".format(track_url, colour, track_hub)
+ def add_track_to_hub(
+ self, sample_name, track_url, track_hub, colour, five_prime=""
+ ):
+ cmd1 = (
+ """echo "track type=bigWig name='{0} {1}' description='{0} {1}'""".format(
+ sample_name, five_prime
+ )
+ )
+ cmd1 += """ height=32 visibility=full maxHeightPixels=32:32:25 bigDataUrl={0} color={1}" >> {2}""".format(
+ track_url, colour, track_hub
+ )
cmd2 = "chmod 755 {0}".format(track_hub)
return [cmd1, cmd2]
def link_to_track_hub(self, track_hub_url, file_name, genome):
import textwrap
db = "org" if genome == "hg19" else "db" # different database call for human
genome = "human" if genome == "hg19" else genome # change hg19 to human
html = """
@@ -1438,35 +1708,56 @@ def link_to_track_hub(self, track_hub_url, file_name, genome):
html += """{db}={genome}&hgt.customText={track_hub_url}" />