Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Nextflow ProteinFunction: support for SQLite db #1138

Draft
wants to merge 7 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 24 additions & 13 deletions nextflow/ProteinFunction/bin/store_polyphen_scores.pl
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,8 @@
use Bio::EnsEMBL::Variation::ProteinFunctionPredictionMatrix;
use Digest::MD5 qw(md5_hex);

my ($species, $port, $host, $user, $pass, $dbname,
my ($species, $offline, $sqlite,
$port, $host, $user, $pass, $dbname,
$peptide, $output_file, $model) = @ARGV;

# Extract model name
Expand Down Expand Up @@ -62,18 +63,28 @@

# save the predictions to the database unless they are null matrices
if ( $any_results ){
my $var_dba = Bio::EnsEMBL::Variation::DBSQL::DBAdaptor->new(
'-species' => $species,
'-port' => $port,
'-host' => $host,
'-user' => $user,
'-pass' => $pass,
'-dbname' => $dbname
);
my $pfpma = $var_dba->get_ProteinFunctionPredictionMatrixAdaptor
or die "Failed to get matrix adaptor";
$pfpma->store($pred_matrix);
$var_dba->dbc and $var_dba->dbc->disconnect_if_idle();
if (!$offline){
my $var_dba = Bio::EnsEMBL::Variation::DBSQL::DBAdaptor->new(
'-species' => $species,
'-port' => $port,
'-host' => $host,
'-user' => $user,
'-pass' => $pass,
'-dbname' => $dbname
);
my $pfpma = $var_dba->get_ProteinFunctionPredictionMatrixAdaptor
or die "Failed to get matrix adaptor";
$pfpma->store($pred_matrix);
$var_dba->dbc and $var_dba->dbc->disconnect_if_idle();
}

if ($sqlite){
my $dbh = DBI->connect("dbi:SQLite:dbname=$sqlite","","");
my $sth = $dbh->prepare("INSERT INTO predictions VALUES(?, ?, ?)");

my $attrib_id = $model_name eq "humdiv" ? 269 : 268;
$sth->execute($pred_matrix->translation_md5, $attrib_id, $pred_matrix->serialize)
}
} else {
warn "Skipping: no results to store\n";
}
35 changes: 22 additions & 13 deletions nextflow/ProteinFunction/bin/store_sift_scores.pl
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,8 @@
use Bio::EnsEMBL::Variation::ProteinFunctionPredictionMatrix;
use Digest::MD5 qw(md5_hex);

my ($species, $port, $host, $user, $pass, $dbname,
my ($species, $offline, $sqlite,
$port, $host, $user, $pass, $dbname,
$peptide, $res_file) = @ARGV;

# parse the results file
Expand Down Expand Up @@ -49,18 +50,26 @@

# save the predictions to the database
if ($results_available == 1 ){
my $var_dba = Bio::EnsEMBL::Variation::DBSQL::DBAdaptor->new(
'-species' => $species,
'-port' => $port,
'-host' => $host,
'-user' => $user,
'-pass' => $pass,
'-dbname' => $dbname
);
my $pfpma = $var_dba->get_ProteinFunctionPredictionMatrixAdaptor
or die "Failed to get matrix adaptor";
$pfpma->store($pred_matrix);
$var_dba->dbc and $var_dba->dbc->disconnect_if_idle();
if (!$offline){
my $var_dba = Bio::EnsEMBL::Variation::DBSQL::DBAdaptor->new(
'-species' => $species,
'-port' => $port,
'-host' => $host,
'-user' => $user,
'-pass' => $pass,
'-dbname' => $dbname
);
my $pfpma = $var_dba->get_ProteinFunctionPredictionMatrixAdaptor
or die "Failed to get matrix adaptor";
$pfpma->store($pred_matrix);
$var_dba->dbc and $var_dba->dbc->disconnect_if_idle();
}

if ($sqlite){
my $dbh = DBI->connect("dbi:SQLite:dbname=$sqlite","","");
my $sth = $dbh->prepare("INSERT INTO predictions VALUES(?, ?, ?)");
$sth->execute($pred_matrix->translation_md5, 267, $pred_matrix->serialize)
}
} else {
warn "Skipping: no results to store\n";
}
43 changes: 37 additions & 6 deletions nextflow/ProteinFunction/main.nf
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,12 @@ params.port = null
params.user = null
params.pass = null
params.database = null
params.offline = false

// SQLite database params
params.sqlite = params.offline
params.sqlite_dir = params.outdir.startsWith("/") ? params.outdir : "${workflow.launchDir}/${params.outdir}" // supports Unix-like only
params.sqlite_db = "${params.sqlite_dir}/${params.species}_PolyPhen_SIFT.db"
// SIFT params
params.sift_run_type = "NONE"
params.median_cutoff = 2.75 // as indicated in SIFT's README
Expand Down Expand Up @@ -84,7 +89,9 @@ if (params.help) {
// Module imports
include { decompress } from './nf_modules/utils.nf'
include { translate_fasta } from './nf_modules/translations.nf'
include { store_translation_mapping } from './nf_modules/database_utils.nf'
include { store_translation_mapping;
init_sqlite_db;
postprocess_sqlite_db } from './nf_modules/database_utils.nf'
include { run_sift_pipeline } from './nf_modules/sift.nf'
include { run_pph2_pipeline } from './nf_modules/polyphen2.nf'

Expand All @@ -101,8 +108,12 @@ if (!params.translated) {
}
}

if (!params.host || !params.port || !params.user || !params.pass || !params.database) {
exit 1, "Error: --host, --port, --user, --pass and --database need to be defined"
if (!params.offline && (!params.host || !params.port || !params.user || !params.pass || !params.database)) {
exit 1, "ERROR: --host, --port, --user, --pass and --database need to be defined"
}

if (params.offline) {
log.info "INFO: --offline mode selected, --sqlite will be turned on by default. If you do not wish to generate SQLite db please use --sqlite 0."
}

// Check run type for each protein function predictor
Expand Down Expand Up @@ -156,6 +167,12 @@ def getFiles (files) {
}

workflow {
if (params.sqlite) {
sqlite_db_prep = init_sqlite_db()
} else {
sqlite_db_prep = "ready"
}

// Translate transcripts from GTF and FASTA if no translation FASTA is given
if (!params.translated) {
translate_fasta(getFiles(params.gtf), getFiles(params.fasta))
Expand All @@ -180,12 +197,26 @@ workflow {
name: "translation_mapping.tsv",
storeDir: params.outdir,
newLine: true) { it.id + "\t" + it.md5 }
store_translation_mapping(translation_mapping)
if (!params.offline) {
store_translation_mapping(translation_mapping)
}

// Get unique translations based on MD5 hashes of their sequences
translated = translated.unique { it.md5 }

// Run protein function prediction
if ( params.sift_run_type != "NONE" ) run_sift_pipeline( translated )
if ( params.pph_run_type != "NONE" ) run_pph2_pipeline( translated )
if ( params.sift_run_type != "NONE" ) {
sift_run = run_sift_pipeline( translated, sqlite_db_prep )
} else {
sift_run = "done"
}
if ( params.pph_run_type != "NONE" ) {
run_pph2_pipeline( translated, sqlite_db_prep )
} else {
polyphen_run = "done"
}

if ( params.sqlite ) {
postprocess_sqlite_db(sift_run, polyphen_run)
}
}
12 changes: 8 additions & 4 deletions nextflow/ProteinFunction/nextflow.config
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,12 @@ profiles {
}
}

//untested
slurm { process.executor = 'slurm' }
slurm {
process {
executor = 'slurm'
time = '3d'
}
}
}

singularity {
Expand All @@ -28,9 +32,9 @@ process {
// Exit status codes:
// - 130: job exceeded LSF allocated memory
// - 140: job exceeded SLURM allocated resources (memory, CPU, time)
errorStrategy = { task.exitStatus in [130, 140] ? 'retry' : 'finish' }
errorStrategy = { task.exitStatus in [130, 137, 140] ? 'retry' : 'finish' }
withLabel: retry_before_ignoring {
errorStrategy = { task.exitStatus in [130, 140] ? 'retry' : 'ignore' }
errorStrategy = { task.exitStatus in [130, 137, 140] ? 'retry' : 'ignore' }
}
maxRetries = 3
}
Expand Down
35 changes: 35 additions & 0 deletions nextflow/ProteinFunction/nf_modules/database_utils.nf
Original file line number Diff line number Diff line change
Expand Up @@ -90,3 +90,38 @@ process get_current_MD5_translations {
EOF
"""
}

process init_sqlite_db {
output: stdout

cache false

"""
#!/usr/bin/perl

use DBI;

my \$dbh = DBI->connect("dbi:SQLite:dbname=${params.sqlite_db}","","");
\$dbh->do("DROP TABLE IF EXISTS predictions");
\$dbh->do("CREATE TABLE predictions(md5, analysis, matrix)");
"""
}

process postprocess_sqlite_db {
input:
val sift_run
val polyphen_run

output: stdout

cache false

"""
#!/usr/bin/perl

use DBI;

my \$dbh = DBI->connect("dbi:SQLite:dbname=${params.sqlite_db}","","");
\$dbh->do("CREATE INDEX md5_idx ON predictions(md5)");
"""
}
19 changes: 14 additions & 5 deletions nextflow/ProteinFunction/nf_modules/polyphen2.nf
Original file line number Diff line number Diff line change
Expand Up @@ -98,9 +98,12 @@ process store_pph2_scores {
val species
tuple val(peptide), path(weka_output), val(model)

output:
stdout

"""
store_polyphen_scores.pl $species ${params.port} ${params.host} \
${params.user} ${params.pass} ${params.database} \
store_polyphen_scores.pl ${species} ${params.offline} ${params.sqlite_db} \
${params.port} ${params.host} ${params.user} ${params.pass} ${params.database} \
${peptide.seqString} ${weka_output} ${model}
"""
}
Expand All @@ -110,16 +113,20 @@ include { delete_prediction_data; update_meta } from './database_utils.nf'
include { filter_existing_translations } from './translations.nf'

workflow run_pph2_pipeline {
take: translated
take:
translated
sqlite_db_prep
main:
if ( params.pph_run_type == "UPDATE" ) {
if ( params.pph_run_type == "UPDATE" && !params.offline ) {
translated = filter_existing_translations( "polyphen_%", translated )
wait = "ready"
} else if ( params.pph_run_type == "FULL" ) {
} else if ( params.pph_run_type == "FULL" && !params.offline ) {
delete_prediction_data("polyphen_%")
wait = delete_prediction_data.out
get_pph2_version()
update_meta("polyphen_version", get_pph2_version.out)
} else {
wait = "ready"
}
// Run PolyPhen-2 and Weka
run_pph2_on_all_aminoacid_substitutions(translated)
Expand All @@ -129,4 +136,6 @@ workflow run_pph2_pipeline {
run_weka(weka_model, run_pph2_on_all_aminoacid_substitutions.out)
store_pph2_scores(wait, // wait for data deletion
params.species, run_weka.out)
emit:
store_sift_scores.out
}
19 changes: 14 additions & 5 deletions nextflow/ProteinFunction/nf_modules/sift.nf
Original file line number Diff line number Diff line change
Expand Up @@ -107,9 +107,12 @@ process store_sift_scores {
val species
tuple val(peptide), path(sift_scores)

output:
stdout

"""
store_sift_scores.pl ${species} ${params.port} ${params.host} \
${params.user} ${params.pass} ${params.database} \
store_sift_scores.pl ${species} ${params.offline} ${params.sqlite_db} \
${params.port} ${params.host} ${params.user} ${params.pass} ${params.database} \
${peptide.seqString} ${sift_scores}
"""
}
Expand All @@ -131,16 +134,20 @@ workflow update_sift_db_version {
}

workflow run_sift_pipeline {
take: translated
take:
translated
sqlite_db_prep
main:
if ( params.sift_run_type == "UPDATE" ) {
if ( params.sift_run_type == "UPDATE" && !params.offline ) {
translated = filter_existing_translations( "sift", translated )
wait = "ready"
} else if ( params.sift_run_type == "FULL" ) {
} else if ( params.sift_run_type == "FULL" && !params.offline ) {
delete_prediction_data("sift")
wait = delete_prediction_data.out
update_sift_version()
update_sift_db_version( file(params.blastdb) )
} else {
wait = "ready"
}
// Align translated sequences against BLAST database to run SIFT
align_peptides(translated,
Expand All @@ -150,4 +157,6 @@ workflow run_sift_pipeline {
store_sift_scores(wait, // wait for data deletion
params.species,
run_sift_on_all_aminoacid_substitutions.out)
emit:
store_sift_scores.out
}
2 changes: 2 additions & 0 deletions nextflow/utils/utils.nf
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,8 @@ def check_JVM_mem (min=0.4) {
@param min Min memory in GiB (default: 0.4)
*/

// converts min to float to avoid error while rounding
min = min / 10.0 * 10.0
mem = Runtime.getRuntime().maxMemory() / (1024 ** 3) // in GiB
if (mem < min) {
log.error """
Expand Down