-
Notifications
You must be signed in to change notification settings - Fork 0
/
ete_build.py
342 lines (303 loc) · 16.1 KB
/
ete_build.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
#!/usr/bin/env python
import argparse
import os
import sys
import subprocess
import tempfile
import json
import src.mafft as mafft
import src.muscle as muscle
import src.clustalo as clustalo
import src.tcoffee as tcoffee
import src.famsa as famsa
import src.trimal as trimal
import src.trim_alg as trim_alg
import src.clipkit as clipkit
import src.fasttree as fasttree
import src.phyml as phyml
import src.raxml as raxml
import src.iqtree as iqtree
import src.mrbayes as mrbayes
# Predefined workflows
PREDEFINED_WORKFLOWS = {
"workflow1": {"aligner": "mafft_default", "trimmer": "trimal_default", "tree_builder": "fasttree_default"},
"workflow2": {"aligner": "famsa_default", "trimmer": "trimal_default", "tree_builder": "fasttree_default"},
"workflow3": {"aligner": "famsa_default", "trimmer": "trim_alg_v2_default", "tree_builder": "fasttree_default"},
#"ana-workflow": {"aligner": "hybrid", "trimmer": "trimal", "tree_builder": "fasttree"},
# Add more predefined workflows as needed
}
ALIGNERS = ["mafft", "muscle", "tcoffee", "clustalo", "famsa"]
TRIMMERS = ["trimal", "clipkit", "trim_alg_v2"]
TREE_BUILDERS = ["fasttree", "phyml", "raxml", "iqtree", "mrbayes"]
def generate_nextflow_config(args):
"""
Generate the nextflow.config file based on the provided arguments.
"""
# Base configuration content
config_content = f"""
params.input = '{args.input}'
params.output = '{args.output}'
params.thread = {args.cpus}
params.aligner = '{args.aligner}'
params.trimmer = '{args.trimmer}'
params.tree_builder = '{args.tree_builder}'
params.memory = '{args.memory}'
params.time = '{args.time}'
params.queue = '{args.slurm_partition if args.mode == "slurm" else ""}'
params.executor = '{args.mode}' // local or slurm
"""
# Define clusterOptions only if mode is 'slurm'
cluster_options_align = f"clusterOptions = '--error={args.output}/logs/slurm_align_%j.err --output={args.output}/logs/slurm_align_%j.out'" if args.mode == "slurm" else ""
cluster_options_trim = f"clusterOptions = '--error={args.output}/logs/slurm_trim_%j.err --output={args.output}/logs/slurm_trim_%j.out'" if args.mode == "slurm" else ""
cluster_options_build = f"clusterOptions = '--error={args.output}/logs/slurm_build_%j.err --output={args.output}/logs/slurm_build_%j.out'" if args.mode == "slurm" else ""
# Add process-specific configurations
config_content += f"""
process {{
withName: 'align' {{
{cluster_options_align}
executor = params.executor
queue = params.queue // SLURM queue name
time = params.time // SLURM time allocation
memory = params.memory // SLURM memory allocation
}}
withName: 'trim' {{
{cluster_options_trim}
executor = params.executor
queue = params.queue
time = params.time
memory = params.memory
}}
withName: 'build' {{
{cluster_options_build}
executor = params.executor
queue = params.queue
time = params.time
memory = params.memory
}}
}}
"""
# Write the configuration to a file
with open("nextflow.config", "w") as f:
f.write(config_content)
def remove_comment(line):
"""Remove comments from a line."""
if '#' in line:
line = line.split('#', 1)[0]
return line.strip()
def convert_cfg_to_json(cfg_file, aligner, trimmer, tree_builder):
"""
Convert a .cfg file to a JSON-like dictionary for the specified tools (aligner, trimmer, tree_builder).
Parameters:
- cfg_file (str): Path to the .cfg file.
- aligner (str): The exact aligner to filter and include in the config.
- trimmer (str): The exact trimmer to filter and include in the config.
- tree_builder (str): The exact tree builder to filter and include in the config.
Returns:
- dict: A dictionary representing the filtered configuration for JSON.
"""
config = {"aligner": {}, "trimmer": {}, "tree_builder": {}}
current_section = None
section_data = {}
with open(cfg_file, 'r') as file:
for line in file:
line = remove_comment(line).strip()
if not line or line.startswith("#"):
continue
if line.startswith("[") and line.endswith("]"):
if current_section:
# Determine the correct parser based on the full section name
if current_section == aligner:
if section_data.get("_app") == "mafft":
mafft_config = mafft.parse_mafft_options(section_data)
config["aligner"]["mafft"] = mafft_config
elif section_data.get("_app") == "muscle":
config["aligner"]["muscle"] = muscle.parse_muscle_options(section_data)
elif section_data.get("_app") == "clustalo":
config["aligner"]["clustalo"] = clustalo.parse_clustalo_options(section_data)
elif section_data.get("_app") == "tcoffee":
config["aligner"]["tcoffee"] = tcoffee.parse_tcoffee_options(section_data)
elif section_data.get("_app") == "famsa":
config["aligner"]["famsa"] = famsa.parse_famsa_options(section_data)
else:
config["aligner"][section_data["_app"]] = section_data
elif current_section == trimmer:
if section_data.get("_app") == "trimal":
config["trimmer"]["trimal"] = trimal.parse_trimal_options(section_data)
elif section_data.get("_app") == "trim_alg_v2":
config["trimmer"]["trim_alg_v2"] = trim_alg.parse_trimalg_options(section_data)
elif section_data.get("_app") == "clipkit":
config["trimmer"]["clipkit"] = clipkit.parse_clipkit_options(section_data)
else:
config["trimmer"][section_data["_app"]] = section_data
elif current_section == tree_builder:
if section_data.get("_app") == "fasttree":
config["tree_builder"]["fasttree"] = fasttree.parse_fasttree_options(section_data)
elif section_data.get("_app") == "phyml":
config["tree_builder"]["phyml"] = phyml.parse_phyml_options(section_data)
elif section_data.get("_app") == "raxml":
config["tree_builder"]["raxml"] = raxml.parse_raxml_options(section_data)
elif config["tree_builder"].get("_app") == "iqtree":
config["tree_builder"]["iqtree"] = iqtree.parse_iqtree_options(section_data)
elif config["tree_builder"].get("_app") == "mybayes":
config["tree_builder"]["mybayes"] = mrbayes.parse_mrbayes_options(section_data)
else:
config["tree_builder"][section_data["_app"]] = section_data
# Start a new section
current_section = line[1:-1] # Keep the full section name
section_data = {}
elif "=" in line:
key, value = line.split("=", 1)
key = key.strip()
value = remove_comment(value).strip() # Remove comments from the value
# Handle special case for empty strings
if value == '""':
value = ""
if value.isdigit():
value = int(value)
elif value.replace('.', '', 1).isdigit():
value = float(value)
elif value.lower() in ["true", "false"]:
value = value.lower() == "true"
section_data[key] = value
# Handle the last section
if current_section:
if current_section == aligner:
if section_data.get("_app") == "mafft":
mafft_config = mafft.parse_mafft_options(section_data)
config["aligner"]["mafft"] = mafft_config
elif section_data.get("_app") == "muscle":
config["aligner"]["muscle"] = muscle.parse_muscle_options(section_data)
elif section_data.get("_app") == "clustalo":
config["aligner"]["clustalo"] = parse_clustalo_options(section_data)
elif section_data.get("_app") == "tcoffee":
config["aligner"]["tcoffee"] = tcoffee.parse_tcoffee_options(section_data)
elif section_data.get("_app") == "famsa":
config["aligner"]["famsa"] = famsa.parse_famsa_options(section_data)
else:
config["aligner"][section_data["_app"]] = section_data
elif current_section == trimmer:
if section_data.get("_app") == "trimal":
config["trimmer"]["trimal"] = trimal.parse_trimal_options(section_data)
elif section_data.get("_app") == "trim_alg_v2":
config["trimmer"]["trim_alg_v2"] = trim_alg.parse_trimalg_options(section_data)
elif section_data.get("_app") == "clipkit":
config["trimmer"]["clipkit"] = clipkit.parse_clipkit_options(section_data)
else:
config["trimmer"][section_data["_app"]] = section_data
elif current_section == tree_builder:
if section_data.get("_app") == "fasttree":
config["tree_builder"]["fasttree"] = fasttree.parse_fasttree_options(section_data)
elif section_data.get("_app") == "phyml":
config["tree_builder"]["phyml"] = phyml.parse_phyml_options(section_data)
elif section_data.get("_app") == "raxml":
config["tree_builder"]["raxml"] = raxml.parse_raxml_options(section_data)
elif config["tree_builder"].get("_app") == "iqtree":
config["tree_builder"]["iqtree"] = iqtree.parse_iqtree_options(section_data)
elif config["tree_builder"].get("_app") == "mybayes":
config["tree_builder"]["mybayes"] = mrbayes.parse_mrbayes_options(section_data)
else:
config["tree_builder"][section_data["_app"]] = section_data
return config
def run_nextflow(mode, input_file, output_dir, aligner, \
trimmer, tree_builder, memory, threads, log_file, work_dir, \
supermatrix=False, target_species=None, coalescent=False, separator='_', field=0,
workflow_config=None, resume=False, script="ete_build_dsl2.nf"):
if workflow_config and workflow_config.endswith(".cfg"):
cfg_json = convert_cfg_to_json(workflow_config, aligner, trimmer, tree_builder)
json_file = workflow_config.replace(".cfg", ".json")
with open(json_file, 'w') as out_json:
json.dump(cfg_json, out_json, indent=4)
workflow_config = json_file
# Split aligner and tree_builder by underscore and take the first part
aligner = aligner.split("_")[0]
tree_builder = tree_builder.split("_")[0]
# Handle the trimmer separately, preserving 'trim_alg_v2'
if trimmer.startswith("trim_alg_v2"):
trimmer = "trim_alg_v2"
else:
trimmer = trimmer.split("_")[0]
cmd = [
"nextflow",
"-C", "nextflow.config",
"-log", log_file,
"run", script,
"--input", input_file,
"--output", output_dir,
"--aligner", aligner,
"--trimmer", trimmer,
"--tree_builder", tree_builder,
"--memory", memory,
"--thread", str(threads),
"-work-dir", work_dir,
"--spanme-delimiter", separator,
"--spname-field", field
]
if workflow_config:
cmd.extend(["--customConfig", workflow_config])
if supermatrix and target_species:
cmd.append("--supermatrix_mode")
cmd.extend(["--target_species", target_species])
if coalescent:
cmd.append("--coalescent_mode")
if resume:
cmd.append("-resume")
print(" ".join(cmd))
process = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, universal_newlines=True)
while True:
output = process.stdout.readline()
if output == '' and process.poll() is not None:
break
if output:
print(output.strip())
return_code = process.poll()
if return_code != 0:
print(f"Error: Nextflow script {script} exited with code {return_code}", file=sys.stderr)
return return_code
def main():
parser = argparse.ArgumentParser(description="Run Nextflow workflow.")
parser.add_argument("--mode", default='local', choices=["local", "slurm"], required=True, help="Execution mode: local or slurm.")
parser.add_argument("--slurm-partition", help="SLURM partition name (required if mode is slurm).")
parser.add_argument("--time", default="1h", help="Time limit for SLURM jobs (only if mode is slurm).")
parser.add_argument("--memory", default="4GB", help="Memory allocation for SLURM jobs (only if mode is slurm).")
parser.add_argument("--cpus", type=int, default=4, help="Number of CPUs for SLURM jobs (only if mode is slurm).")
parser.add_argument("--script", default="ete_build_dsl2.nf", help="Path to the Nextflow script to run.")
parser.add_argument("--input", required=True, help="Input fasta file or directory.")
parser.add_argument("--output", required=True, help="Output directory.")
parser.add_argument("--supermatrix", action="store_true", help="Enable supermatrix mode to concatenate individual gene alignments before building the tree.")
parser.add_argument("--target-species", help="Path to the target species file for supermatrix mode.")
parser.add_argument("--coalescent", action="store_true", help="Enable coalescent mode for supermatrix.")
parser.add_argument("--aligner", default="mafft", help="Alignment tool.")
parser.add_argument("--trimmer", default="none", help="Trimming tool.")
parser.add_argument("--tree_builder", default="fasttree", help="Tree building tool.")
parser.add_argument("--workflow", help="Select a predefined workflow.") #choices=list(PREDEFINED_WORKFLOWS.keys()),
parser.add_argument("--resume", action="store_true", help="Resume from the last failed step.")
parser.add_argument("--config", help="Custom workflow config file.")
parser.add_argument('--spname-delimiter', dest='separator', help='Separator for species code in sequence names.', default='|')
parser.add_argument('--spname-field', dest='field', help='Field number for species code in sequence names.', default='0')
args = parser.parse_args()
# Validate SLURM-specific arguments
if args.mode == "slurm" and not args.slurm_partition:
parser.error("--slurm-partition is required when mode is slurm.")
# Ensure that the output, log, and work directories exist
if not os.path.exists(args.output):
print(f"Directory {args.output} does not exist. Creating it now.")
os.makedirs(args.output, exist_ok=True)
# Convert paths to absolute paths
args.input = os.path.abspath(args.input)
args.output = os.path.abspath(args.output)
work_dir = os.path.join(os.path.abspath(args.output), "work/")
log_path = os.path.join(os.path.abspath(args.output), ".nextflow.log")
# If a predefined workflow is selected, override the tool choices
if args.workflow:
workflow_params = PREDEFINED_WORKFLOWS[args.workflow]
args.aligner = workflow_params["aligner"]
args.trimmer = workflow_params["trimmer"]
args.tree_builder = workflow_params["tree_builder"]
# Generate the Nextflow config AFTER setting the workflow-specific parameters
generate_nextflow_config(args)
run_nextflow(args.mode, args.input, args.output, args.aligner, \
args.trimmer, args.tree_builder, args.memory, args.cpus, log_path, \
work_dir, args.supermatrix, args.target_species, args.coalescent, args.separator, args.field, \
args.config, args.resume, args.script)
if __name__ == "__main__":
main()