Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add benchmarking mode (resolves #101) #107

Open
wants to merge 14 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 24 additions & 5 deletions src/toil_lib/spark.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,20 +27,26 @@

def spawn_spark_cluster(job,
numWorkers,
sparkMasterContainer="quay.io/ucsc_cgl/apache-spark-master:1.5.2",
sparkWorkerContainer="quay.io/ucsc_cgl/apache-spark-worker:1.5.2",
cores=None,
memory=None,
disk=None,
overrideLeaderIP=None):
'''
:param numWorkers: The number of worker nodes to have in the cluster. \
Must be greater than or equal to 1.
:param sparkMasterContainer: The Docker image to run for the Spark master.
:param sparkWorkerContainer: The Docker image to run for the Spark worker.
:param cores: Optional parameter to set the number of cores per node. \
If not provided, we use the number of cores on the node that launches \
the service.
:param memory: Optional parameter to set the memory requested per node.
:param disk: Optional parameter to set the disk requested per node.
:type leaderMemory: int or string convertable by bd2k.util.humanize.human2bytes to an int
:type numWorkers: int
:type sparkMasterContainer: str
:type sparkWorkerContainer: str
:type leaderMemory: int or string convertable by bd2k.util.humanize.human2bytes to an int
:type cores: int
:type memory: int or string convertable by bd2k.util.humanize.human2bytes to an int
:type disk: int or string convertable by bd2k.util.humanize.human2bytes to an int
Expand All @@ -49,13 +55,15 @@ def spawn_spark_cluster(job,
if numWorkers < 1:
raise ValueError("Must have more than one worker. %d given." % numWorkers)

leaderService = SparkService(cores=cores,
leaderService = SparkService(sparkMasterContainer,
cores=cores,
memory=memory,
disk=disk,
overrideLeaderIP=overrideLeaderIP)
leaderIP = job.addService(leaderService)
for i in range(numWorkers):
job.addService(WorkerService(leaderIP,
sparkWorkerContainer,
cores=cores,
disk=disk,
memory=memory),
Expand Down Expand Up @@ -98,16 +106,19 @@ class SparkService(Job.Service):
"""

def __init__(self,
sparkContainer,
memory=None,
disk=None,
cores=None,
overrideLeaderIP=None):
"""
:param sparkContainer: The Docker container name to run for Spark.
:param memory: The amount of memory to be requested for the Spark leader. Optional.
:param disk: The amount of disk to be requested for the Spark leader. Optional.
:param cores: Optional parameter to set the number of cores per node. \
If not provided, we use the number of cores on the node that launches \
the service.
:type sparkContainer: str
:type memory: int or string convertable by bd2k.util.humanize.human2bytes to an int
:type disk: int or string convertable by bd2k.util.humanize.human2bytes to an int
:type cores: int
Expand All @@ -117,6 +128,7 @@ def __init__(self,
cores = multiprocessing.cpu_count()

self.hostname = overrideLeaderIP
self.sparkContainer = sparkContainer

Job.Service.__init__(self, memory=memory, cores=cores, disk=disk)

Expand All @@ -135,9 +147,10 @@ def start(self, job):
self.sparkContainerID = dockerCheckOutput(job=job,
defer=STOP,
workDir=os.getcwd(),
tool="quay.io/ucsc_cgl/apache-spark-master:1.5.2",
tool=self.sparkContainer,
dockerParameters=["--net=host",
"-d",
"-v", "/var/run/docker.sock:/var/run/docker.sock",
"-v", "/mnt/ephemeral/:/ephemeral/:rw",
"-e", "SPARK_MASTER_IP=" + self.hostname,
"-e", "SPARK_LOCAL_DIRS=/ephemeral/spark/local",
Expand Down Expand Up @@ -188,19 +201,24 @@ class WorkerService(Job.Service):
Should not be called outside of `SparkService`.
"""

def __init__(self, masterIP, memory=None, cores=None, disk=None):
def __init__(self, masterIP, sparkContainer, memory=None, cores=None, disk=None):
"""
:param masterIP: The IP of the Spark master.
:param sparkContainer: The container to run for Spark.
:param memory: The memory requirement for each node in the cluster. Optional.
:param disk: The disk requirement for each node in the cluster. Optional.
:param cores: Optional parameter to set the number of cores per node. \
If not provided, we use the number of cores on the node that launches \
the service.
:type masterIP: str
:type sparkContainer: str
:type memory: int or string convertable by bd2k.util.humanize.human2bytes to an int
:type disk: int or string convertable by bd2k.util.humanize.human2bytes to an int
:type cores: int
"""

self.masterIP = masterIP
self.sparkContainer = sparkContainer

if cores is None:
cores = multiprocessing.cpu_count()
Expand All @@ -219,9 +237,10 @@ def start(self, job):
self.sparkContainerID = dockerCheckOutput(job=job,
defer=STOP,
workDir=os.getcwd(),
tool="quay.io/ucsc_cgl/apache-spark-worker:1.5.2",
tool=self.sparkContainer,
dockerParameters=["--net=host",
"-d",
"-v", "/var/run/docker.sock:/var/run/docker.sock",
"-v", "/mnt/ephemeral/:/ephemeral/:rw",
"-e",
"\"SPARK_MASTER_IP=" + self.masterIP + ":" + _SPARK_MASTER_PORT + "\"",
Expand Down
11 changes: 11 additions & 0 deletions src/toil_lib/tools/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,17 @@
import subprocess


def log_runtime(job, start, end, cmd):

elapsed_time = end - start

hours = int(elapsed_time) / (60 * 60)
minutes = int(elapsed_time - (60 * 60 * hours)) / 60
seconds = int(elapsed_time - (60 * 60 * hours) - (60 * minutes)) % 60

job.fileStore.logToMaster("%s ran in %dh%dm%ds" % (cmd, hours, minutes, seconds))


def get_mean_insert_size(work_dir, bam_name):
"""Function taken from MC3 Pipeline"""
cmd = "docker run --log-driver=none --rm -v {}:/data quay.io/ucsc_cgl/samtools " \
Expand Down
169 changes: 167 additions & 2 deletions src/toil_lib/tools/aligners.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,11 @@
import os
import subprocess
import time

from toil.lib.docker import dockerCall

from toil_lib import require
from toil_lib.tools import log_runtime
from toil_lib.urls import download_url


Expand Down Expand Up @@ -82,7 +85,11 @@ def run_star(job, r1_id, r2_id, star_index_url, wiggle=False, sort=True):
return transcriptome_id, aligned_id, log_id, sj_id


def run_bwakit(job, config, sort=True, trim=False, mark_secondary=False):
def run_bwakit(job, config,
sort=True,
trim=False,
mark_secondary=False,
benchmarking=False):
"""
Runs BWA-Kit to align single or paired-end fastq files or realign SAM/BAM files.

Expand Down Expand Up @@ -118,6 +125,8 @@ def run_bwakit(job, config, sort=True, trim=False, mark_secondary=False):
:param bool sort: If True, sorts the BAM
:param bool trim: If True, performs adapter trimming
:param bool mark_secondary: If True, mark shorter split reads as secondary
:param boolean benchmarking: If true, returns the runtime along with the
FileStoreID.
:return: FileStoreID of BAM
:rtype: str
"""
Expand Down Expand Up @@ -180,9 +189,165 @@ def run_bwakit(job, config, sort=True, trim=False, mark_secondary=False):
for sample in samples:
parameters.append('/data/{}'.format(sample))

start_time = time.time()
dockerCall(job=job, tool='quay.io/ucsc_cgl/bwakit:0.7.12--c85ccff267d5021b75bb1c9ccf5f4b79f91835cc',
parameters=parameters, workDir=work_dir)
end_time = time.time()
log_runtime(job, start_time, end_time, 'bwakit')

# Either write file to local output directory or upload to S3 cloud storage
job.fileStore.logToMaster('Aligned sample: {}'.format(config.uuid))
return job.fileStore.writeGlobalFile(os.path.join(work_dir, 'aligned.aln.bam'))
bam_id = job.fileStore.writeGlobalFile(os.path.join(work_dir, 'aligned.aln.bam'))
if benchmarking:
return (bam_id, (end_time - start_time))
else:
return bam_id


def run_bowtie2(job,
read1,
index_ids,
read2=None,
benchmarking=False):
'''
Runs bowtie2 for alignment.

:param JobFunctionWrappingJob job: passed automatically by Toil.
:param str read1: The FileStoreID of the first-of-pair file.
:param str name1: The FileStoreID of the NAME.1.bt2 index file.
:param str name2: The FileStoreID of the NAME.2.bt2 index file.
:param str name3: The FileStoreID of the NAME.3.bt2 index file.
:param str name4: The FileStoreID of the NAME.4.bt2 index file.
:param str rev1: The FileStoreID of the NAME.rev.1.bt2 index file.
:param str rev2: The FileStoreID of the NAME.rev.2.bt2 index file.
:param str ref: The reference FASTA FileStoreID.
:param str read2: The (optional) FileStoreID of the first-of-pair file.
:param boolean benchmarking: If true, returns the runtime along with the
FileStoreID.
'''
work_dir = job.fileStore.getLocalTempDir()
file_ids = [ref, read1]
file_ids.extend(index_ids)
file_names = ['ref.fa', 'read1.fq',
'ref.1.bt2', 'ref.2.bt2', 'ref.3.bt2', 'ref.4.bt2',
'ref.rev.1.bt2', 'ref.rev.2.bt2']

parameters = ['-x', '/data/ref',
'-1', '/data/read1.fq',
'-S', '/data/sample.sam',
'-t', str(job.cores)]

if read2:
file_ids.append(read2)
file_names.append('read2.fq')
parameters.extend(['-2', '/data/read2.fq'])
for file_store_id, name in zip(file_ids, file_names):
job.fileStore.readGlobalFile(file_store_id, os.path.join(work_dir, name))

start_time = time.time()
dockerCall(job=job,
workDir=work_dir,
parameters=parameters,
tool='quay.io/ucsc_cgl/bowtie2')
end_time = time.time()
log_runtime(job, start_time, end_time, 'bowtie2')

sam_id = job.fileStore.writeGlobalFile(os.path.join(work_dir, 'sample.sam'))
if benchmarking:
return (sam_id, (end_time - start_time))
else:
return sam_id


def run_snap(job,
read1,
index_ids,
read2=None,
sort=False,
mark_duplicates=False,
benchmarking=False):
'''
Runs SNAP for alignment.

If both first- and second-of-pair reads are passed, runs in paired mode.
Else runs in single mode.

:param JobFunctionWrappingJob job: passed automatically by Toil.
:param str read1: The FileStoreID of the first-of-pair file.
:param str genome: The FileStoreID of the SNAP Genome index.
:param str genome_index: The FileStoreID of the SNAP GenomeIndex index.
:param str genome_hash: The FileStoreID of the SNAP GenomeIndexHash index.
:param str overflow: The FileStoreID of the SNAP OverflowTable index.
:param str ref: The reference FASTA FileStoreID.
:param str read2: The (optional) FileStoreID of the first-of-pair file.
:param boolean sort: If true, sorts the reads. Defaults to false. If enabled,
this function will also return the BAM Index.
:param boolean mark_duplicates: If true, marks reads as duplicates. Defaults
to false. This option is only valid if sort is True.
:param boolean benchmarking: If true, returns the runtime along with the
FileStoreID.
'''
work_dir = job.fileStore.getLocalTempDir()
file_ids = [read1]
file_ids.extend(
file_names = ['read1.fq',
'Genome',
'GenomeIndex',
'GenomeIndexHash',
'OverflowTable']

if read2:
file_ids.append(read2)
file_names.append('read2.fq')

parameters = ['paired'
'/data/',
'/data/read1.fq',
'/data/read2.fq',
'-o', '-bam', '/data/sample.bam',
'-t', str(job.cores)]
else:

parameters = ['single'
'/data/',
'/data/read1.fq',
'-o', '-bam', '/data/sample.bam',
'-t', str(job.cores)]

if sort:

parameters.append('-so')

if not mark_duplicates:
parameters.extend(['-S', 'd'])

else:

require(not mark_duplicates,
'Cannot run duplicate marking if sort is not enabled.')

for file_store_id, name in zip(file_ids, file_names):
job.fileStore.readGlobalFile(file_store_id, os.path.join(work_dir, name))

start_time = time.time()
dockerCall(job=job,
workDir=work_dir,
parameters=parameters,
tool='quay.io/ucsc_cgl/snap')
end_time = time.time()
log_runtime(job, start_time, end_time, 'snap (sort={}, dm={})'.format(sort,
mark_duplicates))

bam_id = job.fileStore.writeGlobalFile(os.path.join(work_dir, 'sample.bam'))
if not sort:
if benchmarking:
return (bam_id, (end_time - start_time))
else:
return bam_id
else:
bai_id = job.fileStore.writeGlobalFile(os.path.join(work_dir, 'sample.bam.bai'))
if benchmarking:
return (bam_id, bai_id, (end_time - start_time))
else:
return (bam_id, bai_id)

Loading