diff --git a/.github/workflows/on-pull-request.yaml b/.github/workflows/on-pull-request.yaml index 712ad330..35ec223a 100755 --- a/.github/workflows/on-pull-request.yaml +++ b/.github/workflows/on-pull-request.yaml @@ -46,4 +46,4 @@ jobs: - name: Test with pytest # We do not want it to run the email tests because the credentials are not stored in GitHub run: | - python3 -m pytest -k 'not email' + python3 -m pytest -k 'not email and not wscleaner' diff --git a/.gitignore b/.gitignore index d814cb1a..e906459a 100755 --- a/.gitignore +++ b/.gitignore @@ -6,3 +6,4 @@ seglh_naming.egg-info/ venv/ temp/ .coverage +*data_unzipped \ No newline at end of file diff --git a/README.md b/README.md index 4e4c65b4..6648e397 100755 --- a/README.md +++ b/README.md @@ -7,6 +7,7 @@ This repository contains the main scripts for routine analysis of clinical next |[demultiplex.py](demultiplex.py) | Command line | Demultiplex (excluding TSO runs) and calculate cluster density for Illumina NGS data using `bcl2fastq2` [(guide)](demultiplex/README.md) | | [setoff_workflows.py](setoff_workflows.py) | Command line | Upload NGS data to DNAnexus and trigger in-house workflows [(guide)](setoff_workflows/README.md) | | [upload_runfolder](upload_runfolder) | Command line or module import | Uploads an Illumina runfolder to DNAnexus [(guide)](upload_runfolder/README.md)| +| [wscleaner](wscleaner) | Command line | Automates the deletion of runfolders that have been uploaded to the DNAnexus cloud storage service [(guide)](wscleaner/README.md)| # Assumptions / Requirements @@ -16,6 +17,8 @@ Each runfolder must be discrete per workflow, therefore must consist of only one * SNP * WES * Custom Panels / LRPCR +* ONCODEEP +* DEV (with or without UMIs) The type of run is detected by the scripts by matching the Pan numbers within the sample names in the corresponding samplesheet to the pan numbers in the [panel_config](config/panel_config.py). @@ -23,7 +26,7 @@ The type of run is detected by the scripts by matching the Pan numbers within th The script has been tested using python v3.10.6 therefore it is recommended that this version of python is used. -Dependencies, which include the [samplesheet_validator](https://github.com/moka-guys/samplesheet_validator) package**, are installed using the requirements.txt file: +Dependencies, which include the [samplesheet_validator](https://github.com/moka-guys/samplesheet_validator) package\*\*, are installed using the requirements.txt file: ```bash pip3 install -r requirements.txt @@ -52,18 +55,18 @@ The below diagram is a UML class diagram showing the relationships between the c | [demultiplex](demultiplex) | orange | Demultiplex (excluding TSO runs) and calculate cluster density for Illumina NGS data using `bcl2fastq2` [(guide)](demultiplex/README.md) | | [setoff_workflows](setoff_workflows) | pink | Upload NGS data to DNAnexus and trigger in-house workflows [(guide)](setoff_workflows/README.md) | | [toolbox](toolbox) | grey | Contains classes and functions shared [(guide)](toolbox/README.md) | -| [upload_runfolder](upload_runfolder) | purple | Uploads an Illumina runfolder to DNAnexus [(guide)](upload_runfolder/README.md) | +| [upload_runfolder](upload_runfolder) | sand | Uploads an Illumina runfolder to DNAnexus [(guide)](upload_runfolder/README.md) | +| [wscleaner](wscleaner) | purple | Automates the deletion of runfolders that have been uploaded +to the DNAnexus cloud storage service | [(guide)](wscleaner/README.md) | ### Class and Package Diagrams Class and package diagrams were generated by running the following command from the project root: ```bash -pyreverse -o png -p automate_demultiplex . --ignore=test --source-roots . --colorized --color-palette=#CBC3E3,#99DDFF,#44BB99,#BBCC33,#EEDD88,#EE8866,#FFAABB,#DDDDDD --output-directory img/ +pyreverse -o png -p automate_demultiplex . --ignore=test --source-roots . --colorized --color-palette=#CBC3E3,#99DDFF,#44BB99,#BBCC33,#EEDD88,#EE8866,#FFAABB,#DDDDDD,#eab676 --output-directory img/ ``` - - ## Package Diagram ![alt text](img/packages_automate_demultiplex.png) @@ -89,7 +92,8 @@ The above image describes the possible associations in the Class Diagram. In the | sw (script_logger) | Records script-level logs for the setoff workflows script | `TIMESTAMP_setoff_workflow.log` | `/usr/local/src/mokaguys/automate_demultiplexing_logfiles/sw_script_logfiles/` | | sw (rf_loggers["sw"]) | Records runfolder-level logs for the setoff workflows script | `RUNFOLDERNAME_setoff_workflow.log` | `/usr/local/src/mokaguys/automate_demultiplexing_logfiles/sw_script_logfiles/` | | dx_run_script | Records the dx run commands for processing the run. N.B. this is not written to by logging | `RUNFOLDERNAME_dx_run_commands.sh` | `/usr/local/src/mokaguys/automate_demultiplexing_logfiles/dx_run_commands` | -| decision_support_upload_cmds | Records the dx run commands to set off the congenica upload apps. N.B. this is not written to by logging | `RUNFOLDERNAME_decision_support.sh` | `/usr/local/src/mokaguys/automate_demultiplexing_logfiles/dx_run_commands` | +| post_run_dx_run_script | Records the postprocessing commands (TSO runs only), to be run manually after the pipeline apps complete. N.B. this is not written to by logging | `RUNFOLDERNAME_post_run_commands.sh` | `/usr/local/src/mokaguys/automate_demultiplexing_logfiles/dx_run_commands` | +| decision_support_upload_cmds_script | Records the dx run commands to set off the congenica upload apps. N.B. this is not written to by logging | `RUNFOLDERNAME_decision_support.sh` | `/usr/local/src/mokaguys/automate_demultiplexing_logfiles/dx_run_commands` | | proj_creation_script | Records the commands for creating the DNAnexus project. N.B. this is not written to by logging | `RUNFOLDERNAME_create_nexus_project.sh` | `/usr/local/src/mokaguys/automate_demultiplexing_logfiles/dx_run_commands` | | Demultiplex output | Catches any traceback from errors when running the cron job that are not caught by exception handling within the script | `TIMESTAMP.txt` | `/usr/local/src/mokaguys/automate_demultiplexing_logfiles/Demultiplexing_stdout` | | demultiplex (script_logger) | Records script-level logs for the demultiplex script | `TIMESTAMP_demultiplex_script.log` | `/usr/local/src/mokaguys/automate_demultiplexing_logfiles/demultiplexing_script_logfiles/` | @@ -97,11 +101,12 @@ The above image describes the possible associations in the Class Diagram. In the Bcl2fastq output | STDOUT and STDERR from bcl2fastq2 | `bcl2fastq2_output.log` | Within the runfolder | | ss_validator | Records runfolder-level logs for the samplesheet_validator script | `RUNFOLDERNAME_samplesheet_validator_script.log` | `/usr/local/src/mokaguys/automate_demultiplexing_logfiles/samplesheet_validator_script_logfiles/` | | backup | Records the logs from the upload runfolder script | `RUNFOLDERNAME_upload_runfolder.log` | `/usr/local/src/mokaguys/automate_demultiplexing_logfiles/upload_runfolder_script_logfiles/` | +| wscleaner | Records the logs from the wscleaner script | `TIMESTAMP_wscleaner.log` | `/usr/local/src/mokaguys/automate_demultiplexing_logfiles/wscleaner/` | # Pytest -[test](test) contains test data ([/test/data](../test/data)) and test scripts (these use pytest). +[test](test) contains test data ([/test/data](../test/data)), and test scripts within individual modules (these use pytest). Tests can be executed using the following command. It is important to include the ignore flag to prevent pytest from scanning for tests through all test files, which slows down the tests considerably @@ -116,11 +121,12 @@ Currently test suite coverage is as follows: | Module | Coverage | | ------ | -------- | | [ad_email.py](ad_email/ad_email.py) | 94 | -| [ad_logger.py](ad_logger/ad_logger.py) | 81 | -| [demultiplex.py](demultiplex/demultiplex.py) | 76 | +| [ad_logger.py](ad_logger/ad_logger.py) | 100 | +| [demultiplex.py](demultiplex/demultiplex.py) | 83 | | [setoff_workflows.py](setoff_workflows/setoff_workflows.py) | 0 | | [upload_runfolder.py](upload_runfolder/upload_runfolder.py) | 0 | -| [toolbox.py](toolbox/toolbox.py) | 0 | +| [toolbox.py](toolbox/toolbox.py) | 76 | +| [wscleaner.py](wscleaner/wscleaner.py) | 46 | **TESTS AND TEST CASES/FILES *MUST* BE MAINTAINED AND UPDATED ACCORDINGLY IN CONJUNCTION WITH SCRIPT DEVELOPMENT** diff --git a/ad_email/ad_email.py b/ad_email/ad_email.py index 6ff3d626..3ef0ffdc 100755 --- a/ad_email/ad_email.py +++ b/ad_email/ad_email.py @@ -5,6 +5,7 @@ - AdEmail Send email to recipient via SMTP """ + import sys import os import jinja2 @@ -111,7 +112,9 @@ def send_email( self.msg["Subject"] = email_subject self.msg["From"] = self.sender self.msg["To"] = recipients - self.msg.attach(MIMEText(email_message, "html", "utf-8")) # Add msg to e-mail body + self.msg.attach( + MIMEText(email_message, "html", "utf-8") + ) # Add msg to e-mail body self.logger.info(self.logger.log_msgs["sending_email"], self.msg) # Configure SMTP server connection for sending email with smtplib.SMTP( diff --git a/test/test_ad_email.py b/ad_email/test_ad_email.py similarity index 91% rename from test/test_ad_email.py rename to ad_email/test_ad_email.py index a10d6fe6..14fe670d 100755 --- a/test/test_ad_email.py +++ b/ad_email/test_ad_email.py @@ -4,16 +4,22 @@ workstation where the required auth details are stored """ +import os import pytest -from .conftest import logger_obj from ad_email.ad_email import AdEmail from config.ad_config import AdEmailConfig - -logger_obj = logger_obj +from ..conftest import test_data_temp +from ad_logger import ad_logger # TODO finish this test suite as it is currently incomplete +@pytest.fixture(scope="function") +def logger_obj(): + temp_log = os.path.join(test_data_temp, "temp.log") + return ad_logger.AdLogger(__name__, "demux", temp_log).get_logger() + + class TestAdEmail: """ Test Email class diff --git a/ad_logger/ad_logger.py b/ad_logger/ad_logger.py index 026f88f1..d24e4285 100755 --- a/ad_logger/ad_logger.py +++ b/ad_logger/ad_logger.py @@ -1,6 +1,7 @@ """ Automate demultiplex logging. Classes required for logging """ + import sys import re import logging @@ -31,14 +32,14 @@ def get_logging_formatter() -> str: ) -def set_root_logger() -> None: +def set_root_logger() -> object: """ Set up root logger and add stream handler and syslog handler - we only want to add these once else it will duplicate log messages to the terminal. All loggers named with the same stem as the root logger will use these same syslog handler and stream handler - :return None: + :return logger: Logging object """ - sensitive_formatter=SensitiveFormatter(get_logging_formatter()) + sensitive_formatter = SensitiveFormatter(get_logging_formatter()) logger = logging.getLogger(AdLoggerConfig.REPO_NAME) stream_handler = logging.StreamHandler(sys.stdout) stream_handler.setFormatter(sensitive_formatter) @@ -53,8 +54,9 @@ def set_root_logger() -> None: handlers=[ stream_handler, syslog_handler, - ] + ], ) + return logger def shutdown_logs(logger: logging.Logger) -> None: diff --git a/test/test_ad_logger.py b/ad_logger/test_ad_logger.py similarity index 99% rename from test/test_ad_logger.py rename to ad_logger/test_ad_logger.py index 614eded2..2d5dfe64 100755 --- a/test/test_ad_logger.py +++ b/ad_logger/test_ad_logger.py @@ -1,5 +1,6 @@ """ ad_logger.py pytest unit tests. The test suite is currently incomplete """ + import pytest from toolbox import toolbox from ad_logger import ad_logger @@ -47,5 +48,3 @@ def test_get_loggers(self, logfiles_config, caplog): f"Test log message. Logger {loggers[logger_name].name}" ) assert loggers[logger_name].name in caplog.text - - diff --git a/config/ad_config.py b/config/ad_config.py index bd245f6d..cc6cfb20 100755 --- a/config/ad_config.py +++ b/config/ad_config.py @@ -9,6 +9,7 @@ - ToolboxConfig - URConfig """ + import os import sys import datetime @@ -86,7 +87,9 @@ # DNAnexus upload agent path UPLOAD_AGENT_EXE = f"{DOCUMENT_ROOT}/apps/dnanexus-upload-agent-1.5.17-linux/ua" BCL2FASTQ_DOCKER = "seglh/bcl2fastq2:v2.20.0.422_60dbb5a" -GATK_DOCKER = "broadinstitute/gatk:4.1.8.1" # TODO this image should have a hash added in future +GATK_DOCKER = ( + "broadinstitute/gatk:4.1.8.1" # TODO this image should have a hash added in future +) LANE_METRICS_SUFFIX = ".illumina_lane_metrics" DEMUX_NOT_REQUIRED_MSG = "%s run. Does not need demultiplexing locally" @@ -123,7 +126,7 @@ "ed_readcount": f"{TOOLS_PROJECT}:applet-GbkVzbQ0jy1zBZf5k6Xk6QP7", # ED_readcount_analysis_v1.3.0 "ed_cnvcalling": f"{TOOLS_PROJECT}:applet-GbkVyQ80jy1Xf1p6jpPK6p1x", # ED_cnv_calling_v1.3.0 "rpkm": f"{TOOLS_PROJECT}:applet-FxJj0F00jy1ZVXp36PBz2p1j", # RPKM_using_conifer_v1.6 - "duty_csv": f"{TOOLS_PROJECT}:applet-GkzJfX80jy1fQvPk1z316gBy", # duty_csv_v1.4.0 + "duty_csv": f"{TOOLS_PROJECT}:applet-Gp75GB00360KXPV4Jy7PPFfQ", # duty_csv_v1.5.0 }, "WORKFLOWS": { "pipe": f"{TOOLS_PROJECT}:workflow-GPq04280jy1k1yVkQP0fXqBg", # GATK3.5_v2.18 @@ -262,7 +265,10 @@ DX_CMDS = { "create_proj": 'PROJECT_ID="$(dx new project --bill-to %s "%s" --brief --auth ${AUTH})"', - "find_proj_name": f"{SDK_SOURCE}; dx find projects --name *%s* " "--auth %s | awk '{print $3}'", + "find_proj_name": ( + f"{SDK_SOURCE}; dx find projects --name *%s* " + "--auth %s | awk '{print $3}'" + ), "proj_name_from_id": f"{SDK_SOURCE}; dx describe %s --auth %s --json | jq -r .name", "find_proj_id": f"{SDK_SOURCE}; dx describe %s --auth %s --json | jq -r .id", "find_execution_id": ( @@ -366,7 +372,7 @@ class DemultiplexConfig(PanelConfig): "checksums_do_not_match": "Checksums do not match", # Failure message written to md5sum file by integrity check scripts "samplesheet_success": "Samplesheet check successful with no errors identified: %s", "samplesheet_fail": "Processing halted. SampleSheet contains SampleSheet errors: %s ", - "upload_flag_umis": "Runfolder contains UMIs. Runfolder will not be uploaded and requires manual upload: %s" + "upload_flag_umis": "Runfolder contains UMIs. Runfolder will not be uploaded and requires manual upload: %s", } TESTING = TESTING BCL2FASTQ2_CMD = ( @@ -417,7 +423,7 @@ class SWConfig(PanelConfig): RUNFOLDERS = RUNFOLDERS PROD_ORGANISATION = "org-viapath_prod" # Prod org for billing if BRANCH == "main": # Prod branch - + BSPS_ID = "BSPS_MD" DNANEXUS_USERS = { # User access level # TODO remove InterpretationRequest user once per-user accounts set up @@ -538,7 +544,8 @@ class ToolboxConfig(PanelConfig): """ Toolbox configuration """ - if BRANCH == "master": + + if BRANCH == "main": DNANEXUS_PROJECT_PREFIX = "002_" # Denotes production status of run else: DNANEXUS_PROJECT_PREFIX = "003_" # Denotes development status of run @@ -596,3 +603,14 @@ class URConfig: STRINGS = { "upload_started": "Upload started", # Statement to write to DNAnexus upload started file } + + +class RunfolderCleanupConfig(PanelConfig): + """ + Runfolder Cleanup configuration + """ + + TIMESTAMP = TIMESTAMP + RUNFOLDER_PATTERN = RUNFOLDER_PATTERN + RUNFOLDERS = RUNFOLDERS + CREDENTIALS = CREDENTIALS diff --git a/config/log_msgs_config.py b/config/log_msgs_config.py index 6b086ace..9a8749a2 100755 --- a/config/log_msgs_config.py +++ b/config/log_msgs_config.py @@ -37,6 +37,7 @@ "fastq_nonexistent": "No fastq could be intentified that matches the following strings: %s. Error: %s", "sample_excluded": "Sample excluded from samples dictionary due to missing fastqs: %s", "control_sample": "%s control sample detected: %s", + "missing_panno": "Could not identify pan number from the sample name in the sample sheet: %s", "multiple_pipeline_names": ( "Multiple pipeline names detected from panel config for sample list: %s. Scripts do not support different " "pipelines for the same run. Supported pipelines: %s" @@ -45,6 +46,8 @@ "fastq_valid": "Gzip --test determined that the fastq is valid: %s", "fastq_invalid": "Gzip --test determined that the fastq is not valid: %s. Stdout: %s. Stderr: %s", "demux_success": "Demultiplexing was successful for the run with all fastqs valid", + "wes_batch_nos_identified": "WES batch numbers %s identified", + "wes_batch_nos_missing": "WES batch numbers missing. Check for errors in the sample names. Script exited", }, "ad_email": { "sending_email": "Sending the email message: %s", @@ -146,8 +149,6 @@ "upload_rf_error": ( "An error occurred when uploading the rest of the runfolder: %s. See %s and %s for further details. Script exited" ), - "wes_batch_nos_identified": "WES batch numbers %s identified", - "wes_batch_nos_missing": "WES batch numbers missing. Check for errors in the sample names. Script exited", "library_no_err": "Unable to identify library numbers. Script exited. Check for underscores in the sample names.", "checking_fastq": "Checking fastq has been collected: %s", "sample_match": "Fastq in the BaseCalls directory matches the sample name in the SampleSheet: %s, %s", diff --git a/config/panel_config.py b/config/panel_config.py index d5ebb660..8032e586 100755 --- a/config/panel_config.py +++ b/config/panel_config.py @@ -47,6 +47,7 @@ dry_lab True if required to share with dry lab, None if not umis True if run has UMIs """ + # TODO in future do we want to swap physical paths for file IDs TOOLS_PROJECT = "project-ByfFPz00jy1fk6PjpZ95F27J" # 001_ToolsReferenceData diff --git a/test/conftest.py b/conftest.py old mode 100755 new mode 100644 similarity index 88% rename from test/conftest.py rename to conftest.py index 24111584..a84f822f --- a/test/conftest.py +++ b/conftest.py @@ -1,7 +1,8 @@ """ Variables used across test modules, including the setup and teardown fixture -that is run before and after every test +that is run before and after every test. This is the top-level testing configuration """ + import os import re import shutil @@ -9,22 +10,40 @@ import tarfile import logging from shutil import copy + # sys.path.append("..") from ad_logger import ad_logger from toolbox import toolbox from config import ad_config -# Variables used across test classes - -# TODO prevent logging writing to syslog when in testing mode - - test_data_dir = os.path.abspath("data") # Data directory test_data_dir_unzipped = os.path.join( test_data_dir, "data_unzipped/" ) # Unzips data tar to here test_data_temp = os.path.abspath("temp") # Copies data to here for each test -# Place interop in test 7, test 9, test 11 + +temp_log_dir = os.path.join(test_data_temp, "automate_demultiplexing_logfiles") +temp_samplesheet_logdir = os.path.join( + temp_log_dir, "samplesheet_validator_script_logfiles" +) + +# TODO prevent logging writing to syslog when in testing mode +source_runfolder_dirs = os.path.join( + test_data_dir_unzipped, "demultiplex_test_files/test_runfolders/" +) + + +temp_runfolderdir = os.path.join( + test_data_temp, "data_unzipped/demultiplex_test_files/test_runfolders/" +) + + +to_copy_interop_to = [ + os.path.join(source_runfolder_dirs, "999999_A01229_0000_00000TEST7/InterOp/"), + os.path.join(source_runfolder_dirs, "999999_A01229_0000_00000TEST9/InterOp/"), + os.path.join(source_runfolder_dirs, "999999_A01229_0000_0000TEST11/InterOp/"), +] + data_tars = [ { "src": os.path.join(test_data_dir, "demultiplex_test_files.tar.gz"), @@ -47,31 +66,16 @@ "dest": os.path.join(test_data_dir_unzipped, "InterOp"), }, ] -source_runfolder_dirs = os.path.join( - test_data_dir_unzipped, "demultiplex_test_files/test_runfolders/" -) -to_copy_interop_to = [ - os.path.join(source_runfolder_dirs, "999999_A01229_0000_00000TEST7/InterOp/"), - os.path.join(source_runfolder_dirs, "999999_A01229_0000_00000TEST9/InterOp/"), - os.path.join(source_runfolder_dirs, "999999_A01229_0000_0000TEST11/InterOp/"), -] - -temp_runfolderdir = os.path.join( - test_data_temp, "data_unzipped/demultiplex_test_files/test_runfolders/" -) -temp_log_dir = os.path.join(test_data_temp, "automate_demultiplexing_logfiles") -temp_samplesheet_logdir = os.path.join( - temp_log_dir, "samplesheet_validator_script_logfiles" -) -# Temp directory for SampleSheet validator SampleSheet test cases -sv_samplesheet_temp_dir = os.path.join(test_data_temp, "data_unzipped/samplesheets") - -@pytest.fixture(scope="function") -def logger_obj(): - temp_log = os.path.join(test_data_temp, "temp.log") - return ad_logger.AdLogger(__name__, "demux", temp_log).get_logger() +def patch_toolbox(monkeypatch): + """ + Apply patches required for toolbox script. These point the paths to the + temporary locations: + - Test logfiles in the temp logfiles dir and within the temp runfolder dirs + """ + monkeypatch.setattr(toolbox.ToolboxConfig, "RUNFOLDERS", temp_runfolderdir) + monkeypatch.setattr(toolbox.ToolboxConfig, "AD_LOGDIR", temp_log_dir) def create_logdirs(): @@ -86,16 +90,6 @@ def create_logdirs(): os.makedirs(parent_dir, exist_ok=True) -def patch_toolbox(monkeypatch): - """ - Apply patches required for toolbox script. These point the paths to the - temporary locations: - - Test logfiles in the temp logfiles dir and within the temp runfolder dirs - """ - monkeypatch.setattr(toolbox.ToolboxConfig, "RUNFOLDERS", temp_runfolderdir) - monkeypatch.setattr(toolbox.ToolboxConfig, "AD_LOGDIR", temp_log_dir) - - @pytest.fixture(scope="session", autouse=True) def run_before_and_after_session(): """ @@ -106,14 +100,15 @@ def run_before_and_after_session(): os.makedirs( test_data_dir_unzipped, exist_ok=True ) # Holds the unzipped data to copy from for each test - for tar in data_tars: with tarfile.open(tar["src"], "r:gz") as open_tar: open_tar.extractall(path=tar["dest"]) for destination in to_copy_interop_to: shutil.copytree(os.path.join(test_data_dir_unzipped, "InterOp"), destination) - test_data_unzipped = os.path.join(test_data_dir_unzipped, "demultiplex_test_files", "test_runfolders") + test_data_unzipped = os.path.join( + test_data_dir_unzipped, "demultiplex_test_files", "test_runfolders" + ) directories = [ os.path.join(test_data_unzipped, d) @@ -124,7 +119,9 @@ def run_before_and_after_session(): for directory in directories: if re.match(".*999999_.*", directory): - fastqs_dir = os.path.join(test_data_unzipped, directory, "Data", "Intensities", "BaseCalls/") + fastqs_dir = os.path.join( + test_data_unzipped, directory, "Data", "Intensities", "BaseCalls/" + ) os.makedirs(fastqs_dir, exist_ok=True) copy(dummy_fastq, fastqs_dir) yield # Where the testing happens diff --git a/data/test_dir_1_fastqs.txt b/data/test_dir_1_fastqs.txt new file mode 100644 index 00000000..c4c09b58 --- /dev/null +++ b/data/test_dir_1_fastqs.txt @@ -0,0 +1,8 @@ +TSTRUN01_01_000000_000000_TEST_Pan5180_S1_R1_001.fastq.gz +TSTRUN01_01_000000_000000_TEST_Pan5180_S1_R2_001.fastq.gz +TSTRUN01_02_000000_000000_TEST_Pan5180_S1_R1_001.fastq.gz +TSTRUN01_02_000000_000000_TEST_Pan5180_S1_R2_001.fastq.gz +TSTRUN01_03_000000_000000_TEST_Pan5180_S1_R1_001.fastq.gz +TSTRUN01_03_000000_000000_TEST_Pan5180_S1_R2_001.fastq.gz +TSTRUN01_04_000000_000000_TEST_Pan5180_S1_R1_001.fastq.gz +TSTRUN01_04_000000_000000_TEST_Pan5180_S1_R2_001.fastq.gz \ No newline at end of file diff --git a/data/test_dir_2_fastqs.txt b/data/test_dir_2_fastqs.txt new file mode 100644 index 00000000..c764f6ef --- /dev/null +++ b/data/test_dir_2_fastqs.txt @@ -0,0 +1,8 @@ +TSTRUN02_01_000000_000000_TEST_Pan5180_S1_R1_001.fastq.gz +TSTRUN02_01_000000_000000_TEST_Pan5180_S1_R2_001.fastq.gz +TSTRUN02_02_000000_000000_TEST_Pan5180_S1_R1_001.fastq.gz +TSTRUN02_02_000000_000000_TEST_Pan5180_S1_R2_001.fastq.gz +TSTRUN02_03_000000_000000_TEST_Pan5180_S1_R1_001.fastq.gz +TSTRUN02_03_000000_000000_TEST_Pan5180_S1_R2_001.fastq.gz +TSTRUN02_04_000000_000000_TEST_Pan5180_S1_R1_001.fastq.gz +TSTRUN02_04_000000_000000_TEST_Pan5180_S1_R2_001.fastq.gz \ No newline at end of file diff --git a/demultiplex/README.md b/demultiplex/README.md index 3f52c89c..7df86c6a 100755 --- a/demultiplex/README.md +++ b/demultiplex/README.md @@ -83,7 +83,60 @@ Logging is performed using [ad_logger](../ad_logger/ad_logger.py). **N.B. Tests and test cases/files MUST be maintained and updated accordingly in conjunction with script development** -Test datasets are stored in [/test/data](../test/data). The script has a full test suite: -* [test_demultiplex.py](../test/test_demultiplex.py) +Test datasets are stored in [/test/data](../test/data). The script has a test suite: +* [test_demultiplex.py](.test_demultiplex.py) These tests should be run before pushing any code to ensure all tests in the GitHub Actions workflow pass. + +## Demultiplex.py tests + +This directory contains test files used in the demultiplex test suite. + +test_runfolders contains runfolders used to test GetRunfolders().rundemultiplexrunfolders(), +DemultiplexRunfolder.run_demultiplexing() and DemultiplexRunfolder.check_demultiplexing_required(), +and GetRunfolders.loop_through_runs(). + +The test cases are described below. + + +### Test SampleSheets + +Lone SampleSheet test cases are detailed below. These have been created for the purpose of testing SampleSheet related functions in the demultiplex script (valid_samplesheet and no_disallowed_sserrs). The test cases are as follows: + +#### [Valid SampleSheets](data/samplesheets/valid) + +| SampleSheet name | Run Type | +| ---- | -------- | +| 210408_M02631_0186_000000000-JFMNK_SampleSheet.csv | SNP | # DONE +| 210917_NB551068_0409_AH3YNFAFX3_SampleSheet.csv | Custom Panel | # DONE +| 221021_A01229_0145_BHGGTHDMXY_SampleSheet.csv | TSO500 | # DONE +| 221024_A01229_0146_BHKGG2DRX2_SampleSheet.csv | WES Skin | # DONE + +#### [Invalid SampleSheets](data/samplesheets/invalid/) +# TODO check if these cover all cases + +| SampleSheet Name | Details | Expected behaviour | +| ---- | ------- | ------------------ | +| 21aA08_A01229_0040_AHKGTFDRXY_SampleSheet.csv | Empty SampleSheet with invalid name (letter in date) | +| 21108_A01229_0040_AHKGTFDRXY_SampleSheet.csv | Empty SampleSheet with invalid name (date too short) | +| 220413_A01229_0032_AHGKBIEKFR_SampleSheet.csv | Empty SampleSheet | +| 200817_NB068_0009_AH3YERAFX3_SampleSheet.csv | Custom Panel SampleSheet with invalid name (invalid sequencer ID), invalid contents (invalid header, invalid sample names, non-matching sample names, invalid pan number, invalid runtype) | # DONE +| 210513_M02631_0236_000000000-JFMNK_SampleSheet.csv | SNP SampleSheet with invalid characters in the sample name | +| 220404_B01229_0348_HFGIFEIOPY_SampleSheet.csv | TSO SampleSheet with invalid name (invalid sequencer ID), invalid contents (invalid header, invalid sample names, non-matching sample names, invalid pan number, invalid runtype) | # DONE +| 220408_A02631_0186_000000000-JLJFE_SampleSheet.csv | SNP SampleSheet with invalid contents (invalid header, invalid sample names, non-matching sample names, invalid pan number, invalid runtype) | # DONE +| 2110915_M02353_0632_000000000-K242J_SampleSheet.csv | SNP SampleSheet with invalid name (date too long), invalid contents (invalid header, invalid sample names, non-matching sample names, invalid pan number, invalid runtype) | # DONE + +### test_runfolders +| Runfolder | Details | Expected behaviour | +| ----------------------------- | ------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------ | ----------------------------------------------------------------------------------------------------------- | +| 999999_A01229_0000_00000TEST1 | bcl2fastq2_output.log (Demultiplexing already complete) | demultiplexing_requried returns False | +| 999999_A01229_0000_00000TEST2 | No flag files (Sequencing not finished) | demultiplexing_requried returns False | +| 999999_A01229_0000_00000TEST3 | RTAComplete.txt, invalid SampleSheet present in test samplesheet dir with disallowed errors that would cause demultiplexing to fail (Sequencing complete but no processing has taken place yet) | demultiplexing_requried returns False | +| 999999_M02631_0000_00000TEST4 | RTAComplete.txt, matching valid SampleSheet present in test samplesheet dir, InterOp and RunInfo.xml files for Picard CollectIlluminaLaneMetrics calculation, integrity check not required (Sequencing complete but no processing has taken place yet) | demultiplexing_requried returns True | +| 999999_A01229_0000_00000TEST5 | RTAComplete.txt, matching valid SampleSheet present in test samplesheet dir, integrity check required, but no checksum file (Sequencing complete but no processing has taken place yet) | demultiplexing_requried returns False | +| 999999_A01229_0000_00000TEST6 | RTAComplete.txt, matching valid SampleSheet present in test samplesheet dir, integrity check required, md5checksum.txt present and contains integrity check fail string (Sequencing complete but no processing has taken place yet, previous integrity check has failed) | demultiplexing_requried returns False | +| 999999_A01229_0000_00000TEST7 | RTAComplete.txt (sequencing complete) , matching valid SampleSheet present in test samplesheet dir, InterOp and RunInfo.xml files for Picard CollectIlluminaLaneMetrics calculation, integrity check required, md5checksum.txt present and contains matching checksums but no previously checked checksums string (Sequencing complete but no processing has taken place yet, integrity check passed) | demultiplexing_required returns True | +| 999999_A01229_0000_00000TEST8 | Matching valid SampleSheet present in samplesheet dir containing TSO samples | run_demultiplexing returns False, self.run_processed == True | +| 999999_A01229_0000_00000TEST9 | RTAComplee.txt (sequencing complete), Matching valid SampleSheet present in samplesheet dir containing non-TSO | run_demultiplexing returns True, self.run_processed == True (bcl2fastq2 command replaced by a dummy command) | +| 999999_A01229_0000_0000TEST10 | RTAComplete.txt (sequencing complete), SampleSheet missing, integrity check not required (md5checksum.txt present and contains matching checksums with a previously checked checksums string - processing has taken place, integrity check passed) | demultiplexing_required returns False | +| 999999_A01229_0000_0000TEST11 | RTAComplete.txt, SampleSheet present and contains TSO samples, integrity check required (md5checksum.txt present and contains matching checksums but no previously checked checksums string - no processing has taken place) | | demultiplexing_required returns True | diff --git a/demultiplex/demultiplex.py b/demultiplex/demultiplex.py index 8e758d02..498e5aa8 100755 --- a/demultiplex/demultiplex.py +++ b/demultiplex/demultiplex.py @@ -9,10 +9,10 @@ demultiplexed and a valid SampleSheet is present """ + import sys import os import re -import logging import datetime from importlib.metadata import version from shutil import copyfile @@ -126,13 +126,16 @@ def check_run_processed(self, dr_obj: object, runfolder_name: str) -> None: :[aram runfolder_name (str): Runfolder name string :return None: """ - if dr_obj.run_processed: # If runfolder has been processed during this script run + if ( + dr_obj.run_processed + ): # If runfolder has been processed during this script run script_logger.info( script_logger.log_msgs["script_success"], runfolder_name, ) return True + class DemultiplexRunfolder(DemultiplexConfig): """ Call bcl2fastq2 on runfolders after asserting that runfolder has not been @@ -193,7 +196,7 @@ class DemultiplexRunfolder(DemultiplexConfig): prior_ic(checksums) Determines whether an integrity check has been previously performed by this script - checksum_match_message(checksums) + checksum_match_message(checksums) Determine whether the md5sum file contains the checksums match message checksums_match() Reads the md5checksum file and checks for the presence of the CHECKSUM_MATCH_MSG / @@ -202,7 +205,7 @@ class DemultiplexRunfolder(DemultiplexConfig): write_checksums_assessed() Write DemultiplexConfig.CHECKSUMS_ALREADY_ASSESSED message to file to prevent script performing checks on future runs of the script - checksums_do_not_match_message(checksums) + checksums_do_not_match_message(checksums) Determine whethe the md5sum file contains the checksums do not match message calculate_cluster_density() Run dockerised GATK to run Picard CollectIlluminaLaneMetrics - this calculates @@ -222,9 +225,7 @@ class DemultiplexRunfolder(DemultiplexConfig): Copy file from source path to dest path """ - def __init__( - self, folder_name: str, timestamp: str - ): + def __init__(self, folder_name: str, timestamp: str): """ Constructor for the DemultiplexRunfolder class :param folder_name(str): Runfolder name @@ -270,11 +271,13 @@ def setoff_workflow(self) -> Optional[bool]: if self.create_bcl2fastqlog(): if self.run_demultiplexing(): self.run_processed = True - rf_samples_obj = RunfolderSamples(self.rf_obj, self.loggers["demux"]) + rf_samples_obj = RunfolderSamples( + self.rf_obj, self.loggers["demux"] + ) if rf_samples_obj.pipeline == "oncodeep": self.copy_file( self.rf_obj.masterfile_path, - self.rf_obj.runfolder_masterfile_path + self.rf_obj.runfolder_masterfile_path, ) self.run_processed = True return True @@ -292,19 +295,21 @@ def demultiplexing_required(self) -> Optional[bool]: :return None: """ if self.upload_flagfile_absent() and self.bcl2fastqlog_absent(): - if not self.previous_samplesheet_check_fail(): + if not self.previous_samplesheet_check_fail(): self.demux_rf_logger.info( self.demux_rf_logger.log_msgs["ad_version"], git_tag(), ) - if self.sscheck_success_msg_present() or self.valid_samplesheet(): # Early warning ss checks + if ( + self.sscheck_success_msg_present() or self.valid_samplesheet() + ): # Early warning ss checks requires_no_ic = self.seq_requires_no_ic() if requires_no_ic or self.checksumfile_exists(): if self.sequencing_complete(): if requires_no_ic or self.pass_integrity_check(): self.copy_file( self.rf_obj.samplesheet_path, - self.rf_obj.runfolder_samplesheet_path + self.rf_obj.runfolder_samplesheet_path, ) self.calculate_cluster_density() if self.runtype_requires_demultiplexing(): @@ -350,7 +355,10 @@ def previous_samplesheet_check_fail(self) -> Optional[bool]: """ if self.previous_samplesheet_check(): sscheckfile_contents = read_lines(self.rf_obj.sscheck_flagfile_path) - if any(DemultiplexConfig.STRINGS["samplesheet_fail"].split(':')[0] in line for line in sscheckfile_contents): + if any( + DemultiplexConfig.STRINGS["samplesheet_fail"].split(":")[0] in line + for line in sscheckfile_contents + ): script_logger.info( script_logger.log_msgs["previous_ss_check_fail"], self.rf_obj.sscheck_flagfile_path, @@ -377,7 +385,10 @@ def sscheck_success_msg_present(self) -> Optional[bool]: if self.previous_samplesheet_check(): sscheckfile_contents = read_lines(self.rf_obj.sscheck_flagfile_path) read_lines(self.rf_obj.sscheck_flagfile_path) - if any(DemultiplexConfig.STRINGS["samplesheet_success"].split(':')[0] in line for line in sscheckfile_contents): + if any( + DemultiplexConfig.STRINGS["samplesheet_success"].split(":")[0] in line + for line in sscheckfile_contents + ): self.demux_rf_logger.info( self.demux_rf_logger.log_msgs["sscheck_success_msg_present"], ) @@ -396,11 +407,10 @@ def valid_samplesheet(self) -> Tuple[bool, object]: is valid), and SampleSheetCheck object containing any errors identified """ + script_logger.info(script_logger.log_msgs["ss_check_required"]) script_logger.info( - script_logger.log_msgs["ss_check_required"] - ) - script_logger.info( - script_logger.log_msgs["ss_validator_version"], version('samplesheet_validator') + script_logger.log_msgs["ss_validator_version"], + version("samplesheet_validator"), ) sscheck_obj = samplesheet_validator.SamplesheetCheck( self.rf_obj.samplesheet_path, @@ -413,7 +423,7 @@ def valid_samplesheet(self) -> Tuple[bool, object]: sscheck_obj.ss_checks() shutdown_logs(sscheck_obj.logger) self.tso = sscheck_obj.tso - err_str = '. '.join(', '.join(v) for v in sscheck_obj.errors_dict.values()) + err_str = ". ".join(", ".join(v) for v in sscheck_obj.errors_dict.values()) if sscheck_obj.errors: self.demux_rf_logger.error( @@ -423,7 +433,7 @@ def valid_samplesheet(self) -> Tuple[bool, object]: write_lines( self.rf_obj.sscheck_flagfile_path, "w", - DemultiplexConfig.STRINGS['samplesheet_fail'] % err_str, + DemultiplexConfig.STRINGS["samplesheet_fail"] % err_str, ) return False else: @@ -434,7 +444,8 @@ def valid_samplesheet(self) -> Tuple[bool, object]: write_lines( self.rf_obj.sscheck_flagfile_path, "w", - DemultiplexConfig.STRINGS["samplesheet_success"] % datetime.datetime.now(), + DemultiplexConfig.STRINGS["samplesheet_success"] + % datetime.datetime.now(), ) return True @@ -499,12 +510,16 @@ def pass_integrity_check(self) -> Optional[bool]: """ if self.checksumfile_exists(): checksums = read_lines(self.rf_obj.checksumfile_path) - if self.prior_ic(checksums): # If the checksums already checked message is there - if self.checksum_match_message(checksums): # If the checksums match message is there + if self.prior_ic( + checksums + ): # If the checksums already checked message is there + if self.checksum_match_message( + checksums + ): # If the checksums match message is there return True # Checksums match so integrity check passed else: - return False # We don't want script to continue if there is no checksum success message - else: # If the checksums already checked message is not present + return False # We don't want script to continue if there is no checksum success message + else: # If the checksums already checked message is not present return self.checksums_match() # Check for the checksums match message def prior_ic(self, checksums: list) -> Optional[bool]: @@ -519,7 +534,10 @@ def prior_ic(self, checksums: list) -> Optional[bool]: :return (Optional[bool]): Returns true if the checksum file has previously been checked for the success message by the script """ - if DemultiplexConfig.STRINGS["checksums_assessed"].split(':')[0] in checksums[-1]: + if ( + DemultiplexConfig.STRINGS["checksums_assessed"].split(":")[0] + in checksums[-1] + ): self.demux_rf_logger.info( self.demux_rf_logger.log_msgs["checksumfile_checked"] ) @@ -557,7 +575,7 @@ def checksums_match(self) -> Optional[bool]: ) checksums = read_lines(self.rf_obj.checksumfile_path) self.write_checksums_assessed() - + if self.checksum_match_message(checksums): return True elif self.checksums_do_not_match_message(checksums): @@ -577,7 +595,7 @@ def write_checksums_assessed(self) -> None: write_lines( self.rf_obj.checksumfile_path, "a", - DemultiplexConfig.STRINGS['checksums_assessed'] % datetime.datetime.now(), + DemultiplexConfig.STRINGS["checksums_assessed"] % datetime.datetime.now(), ) def checksums_do_not_match_message(self, checksums: list) -> Optional[bool]: @@ -610,8 +628,7 @@ def copy_file(self, source_path: str, dest_path: str) -> None: ) else: self.demux_rf_logger.error( - self.demux_rf_logger.log_msgs["file_copy_fail"], - source_path + self.demux_rf_logger.log_msgs["file_copy_fail"], source_path ) sys.exit(1) @@ -662,17 +679,26 @@ def runtype_requires_demultiplexing(self) -> Optional[bool]: :return (Optional[bool]): True if requires automated processing, else None """ samplesheet = read_lines(self.rf_obj.runfolder_samplesheet_path) - if any(any(pannum in line for line in samplesheet) for pannum in DemultiplexConfig.UMI_DEV_PANEL): + if any( + any(pannum in line for line in samplesheet) + for pannum in DemultiplexConfig.UMI_DEV_PANEL + ): self.demux_rf_logger.info(self.demux_rf_logger.log_msgs["dev_run_umis"]) self.create_bcl2fastqlog() self.add_bcl2fastqlog_msg("DEV UMIs") write_lines( # Create upload started log file to prevent automated upload self.rf_obj.upload_flagfile, "a", - DemultiplexConfig.STRINGS['upload_flag_umis'] % datetime.datetime.now(), + DemultiplexConfig.STRINGS["upload_flag_umis"] % datetime.datetime.now(), + ) + self.demux_rf_logger.info( + self.demux_rf_logger.log_msgs["dev_umis_upload_flagfile"], + self.rf_obj.runfolder_name, ) - self.demux_rf_logger.info(self.demux_rf_logger.log_msgs["dev_umis_upload_flagfile"], self.rf_obj.runfolder_name) - elif any(any(pannum in line for line in samplesheet) for pannum in DemultiplexConfig.TSO_PANELS): + elif any( + any(pannum in line for line in samplesheet) + for pannum in DemultiplexConfig.TSO_PANELS + ): self.create_bcl2fastqlog() # Create bcl2fastq2 log to prevent scripts processing this run self.add_bcl2fastqlog_msg("TSO500") self.demux_rf_logger.info(self.demux_rf_logger.log_msgs["tso_run"]) @@ -740,7 +766,7 @@ def run_demultiplexing(self) -> Optional[bool]: if validate_fastqs(self.rf_obj.fastq_dir_path, self.demux_rf_logger): self.demux_rf_logger.info( self.demux_rf_logger.log_msgs["bcl2fastq_complete"], - self.rf_obj.runfolder_name + self.rf_obj.runfolder_name, ) self.bcl2fastq2_rf_logger.info( err # Write stderr to bcl2fastq2 runfolder logfile @@ -750,7 +776,9 @@ def run_demultiplexing(self) -> Optional[bool]: os.remove( self.rf_obj.bcl2fastqlog_file ) # Bcl2fastq log file removed to trigger re-demultiplex - self.demux_rf_logger.error(self.demux_rf_logger.log_msgs["re_demultiplex"]) + self.demux_rf_logger.error( + self.demux_rf_logger.log_msgs["re_demultiplex"] + ) else: self.demux_rf_logger.error( self.demux_rf_logger.log_msgs["bcl2fastq_failed"], diff --git a/test/test_demultiplex.py b/demultiplex/test_demultiplex.py similarity index 95% rename from test/test_demultiplex.py rename to demultiplex/test_demultiplex.py index 8cb79a85..c45403f4 100755 --- a/test/test_demultiplex.py +++ b/demultiplex/test_demultiplex.py @@ -1,19 +1,25 @@ """ demultiplex.py pytest unit tests. The test suite is currently incomplete """ + import os import itertools import pytest from demultiplex import demultiplex from config import ad_config -from . import conftest +from .. import conftest from ad_logger import ad_logger from pytest_cases import fixture_union +from ..conftest import test_data_temp # TODO finish this test suite as it is currently incomplete +# Temp directory for SampleSheet validator SampleSheet test cases +sv_samplesheet_temp_dir = os.path.join(test_data_temp, "data_unzipped/samplesheets") + + def get_dr_obj(runfolder): """""" dr_obj = demultiplex.DemultiplexRunfolder(runfolder, ad_config.TIMESTAMP) @@ -36,22 +42,22 @@ def valid_samplesheets(): """ return [ os.path.join( - conftest.sv_samplesheet_temp_dir, + sv_samplesheet_temp_dir, "valid", "210408_M02631_0186_000000000-JFMNK_SampleSheet.csv", ), os.path.join( - conftest.sv_samplesheet_temp_dir, + sv_samplesheet_temp_dir, "valid", "210917_NB551068_0409_AH3YNFAFX3_SampleSheet.csv", ), os.path.join( - conftest.sv_samplesheet_temp_dir, + sv_samplesheet_temp_dir, "valid", "221021_A01229_0145_BHGGTHDMXY_SampleSheet.csv", ), os.path.join( - conftest.sv_samplesheet_temp_dir, + sv_samplesheet_temp_dir, "valid", "221024_A01229_0146_BHKGG2DRX2_SampleSheet.csv", ), @@ -65,15 +71,15 @@ def invalid_paths(): """ return [ os.path.join( - conftest.sv_samplesheet_temp_dir, + sv_samplesheet_temp_dir, "210408_M02631_0186_000000000-JFMNN_SampleSheet.csv", ), os.path.join( - conftest.sv_samplesheet_temp_dir, + sv_samplesheet_temp_dir, "210918_NB551068_551068_0409_AH3YNFAFX3_SampleSheet.csv", ), os.path.join( - conftest.sv_samplesheet_temp_dir, + sv_samplesheet_temp_dir, "221021_A01229_0143_BHGGTHDMXY_SampleSheet.csv", ), ] @@ -86,27 +92,27 @@ def invalid_names(): """ return [ os.path.join( - conftest.sv_samplesheet_temp_dir, + sv_samplesheet_temp_dir, "invalid", "21108_A01229_0040_AHKGTFDRXY_SampleSheet.csv", ), os.path.join( - conftest.sv_samplesheet_temp_dir, + sv_samplesheet_temp_dir, "invalid", "21aA08_A01229_0040_AHKGTFDRXY_SampleSheet.csv", ), os.path.join( - conftest.sv_samplesheet_temp_dir, + sv_samplesheet_temp_dir, "invalid", "2110915_M02353_0632_000000000-K242J_SampleSheet.csv", ), os.path.join( - conftest.sv_samplesheet_temp_dir, + sv_samplesheet_temp_dir, "invalid", "200817_NB068_0009_AH3YERAFX3_SampleSheet.csv", ), os.path.join( - conftest.sv_samplesheet_temp_dir, + sv_samplesheet_temp_dir, "invalid", "220404_B01229_0348_HFGIFEIOPY_SampleSheet.csv", ), @@ -120,7 +126,7 @@ def empty_file(): """ return [ os.path.join( - conftest.sv_samplesheet_temp_dir, + sv_samplesheet_temp_dir, "invalid", "220413_A01229_0032_AHGKBIEKFR_SampleSheet.csv", ) @@ -135,17 +141,17 @@ def invalid_contents(): """ return [ os.path.join( - conftest.sv_samplesheet_temp_dir, + sv_samplesheet_temp_dir, "invalid", "220404_B01229_0348_HFGIFEIOPY_SampleSheet.csv", ), os.path.join( - conftest.sv_samplesheet_temp_dir, + sv_samplesheet_temp_dir, "invalid", "220408_A02631_0186_000000000-JLJFE_SampleSheet.csv", ), os.path.join( - conftest.sv_samplesheet_temp_dir, + sv_samplesheet_temp_dir, "invalid", "200817_NB068_0009_AH3YERAFX3_SampleSheet.csv", ), @@ -231,7 +237,9 @@ def test_setoff_processing_nottoproc(self, runfolders_nottoproc, monkeypatch): that none have been processed """ monkeypatch.setattr( - demultiplex.DemultiplexConfig, "DEMULTIPLEX_TEST_RUNFOLDERS", runfolders_nottoproc + demultiplex.DemultiplexConfig, + "DEMULTIPLEX_TEST_RUNFOLDERS", + runfolders_nottoproc, ) gr_obj = get_gr_obj() with pytest.raises(SystemExit) as pytest_wrapped_e: @@ -314,7 +322,7 @@ def internal_chars_invalid(self): """ return [ os.path.join( - conftest.sv_samplesheet_temp_dir, + sv_samplesheet_temp_dir, "invalid", "210513_M02631_0236_000000000-JFMNK_SampleSheet.csv", ), @@ -326,7 +334,7 @@ def perfect_ss(self): Path to perfect SampleSheet """ return os.path.join( - conftest.sv_samplesheet_temp_dir, + sv_samplesheet_temp_dir, "valid", "210408_M02631_0186_000000000-JFMNK_SampleSheet.csv", ) @@ -489,8 +497,7 @@ def non_tso_runfolder(self): @pytest.fixture(scope="function") def checksums_checked(self): - """ - """ + """ """ return [ "Checksums match after 1 hours", "Checksums assessed by AS", @@ -498,8 +505,7 @@ def checksums_checked(self): @pytest.fixture(scope="function") def checksums_not_checked(self): - """ - """ + """ """ return [ "Checksums match after 1 hours", ] @@ -612,7 +618,9 @@ def disallowed_sserrs_pass(self, monkeypatch, ss_with_disallowed_sserrs): Tests function identifies all disallowed ss errors """ dr_obj = get_dr_obj("") - monkeypatch.setattr(dr_obj.rf_obj, "samplesheet_path", ss_with_disallowed_sserrs) + monkeypatch.setattr( + dr_obj.rf_obj, "samplesheet_path", ss_with_disallowed_sserrs + ) valid, sscheck_obj = dr_obj.valid_samplesheet() assert dr_obj.no_disallowed_sserrs(valid, sscheck_obj) ad_logger.shutdown_logs(dr_obj.demux_rf_logger) diff --git a/img/classes_automate_demultiplex.png b/img/classes_automate_demultiplex.png index bddacf1e..cdbe472d 100755 Binary files a/img/classes_automate_demultiplex.png and b/img/classes_automate_demultiplex.png differ diff --git a/img/packages_automate_demultiplex.png b/img/packages_automate_demultiplex.png index e4597ce2..0cfb3282 100755 Binary files a/img/packages_automate_demultiplex.png and b/img/packages_automate_demultiplex.png differ diff --git a/requirements.txt b/requirements.txt index aca2a8cd..734ed251 100755 --- a/requirements.txt +++ b/requirements.txt @@ -10,7 +10,7 @@ coverage==7.5.4 cryptography==36.0.2 decopatch==1.4.10 dill==0.3.8 -dxpy==0.338.1 +dxpy==0.378.0 exceptiongroup==1.2.1 flake8==6.1.0 graphviz==0.20.3 @@ -44,5 +44,5 @@ six==1.16.0 tomli==2.0.1 tomlkit==0.12.5 typing_extensions==4.12.2 -urllib3==1.26.19 -websocket-client==0.54.0 +websocket-client==1.7.0 +urllib3==1.26.15 \ No newline at end of file diff --git a/setoff_workflows/build_dx_commands.py b/setoff_workflows/build_dx_commands.py index bc18fca5..942fab79 100644 --- a/setoff_workflows/build_dx_commands.py +++ b/setoff_workflows/build_dx_commands.py @@ -6,6 +6,7 @@ - BuildSampleDxCommands Build dx run commands commands that are run at the sample level """ + import logging from typing import Union from config.ad_config import SWConfig @@ -50,6 +51,7 @@ class BuildRunfolderDxCommands(SWConfig): return_wes_query() Return WES SQL query. This is a single update query per-run """ + def __init__(self, rf_obj: object, logger: logging.Logger): """ Constructor for the BuildRunfolderDxCommands class @@ -300,7 +302,9 @@ def create_duty_csv_cmd(self) -> str: ] ) - def return_wes_query(self, wes_dnanumbers: list) -> str: # TODO eventually remove this + def return_wes_query( + self, wes_dnanumbers: list + ) -> str: # TODO eventually remove this """ Return WES SQL query. This is a single update query per-run :param wes_dnanumbers (list): List of DNA numbers @@ -322,7 +326,7 @@ class BuildSampleDxCommands(SWConfig): Build dx run commands commands that are run at the sample level Attributes: - + sample_dict (dict): Dictionary of SampleObject per sample, containing sample-specific attributes runfolder_name (str): Runfolder name @@ -360,7 +364,7 @@ class BuildSampleDxCommands(SWConfig): support tool upload bash script build_qiagen_upload_cmd() Build the command to write the qiagen upload command to the decisions support - tool upload bash script + tool upload bash script build_oncodeep_upload_cmd(file_name, run_identifier, file) Build the command to write the OncoDEEP upload dx run command to the decision support tool upload bash script @@ -387,9 +391,7 @@ def __init__( self.sample_dict = sample_dict self.runfolder_name = runfolder_name self.logger = logger - self.logger.info( - self.logger.log_msgs["building_cmds"] - ) + self.logger.info(self.logger.log_msgs["building_cmds"]) def create_pipe_cmd(self) -> str: """ @@ -407,7 +409,9 @@ def create_pipe_cmd(self) -> str: self.sample_dict["sample_name"], ) # Specify instance type for human exome app - if self.sample_dict["panel_settings"]["FH"]: # Larger instance required for FH samples + if self.sample_dict["panel_settings"][ + "FH" + ]: # Larger instance required for FH samples GATK_INSTANCE = "mem3_ssd1_v2_x16" else: GATK_INSTANCE = "mem1_ssd1_v2_x8" @@ -456,9 +460,7 @@ def get_vcfeval_cmd_string(self) -> str: NA12878 we want to skip the vcfeval stage (the app default is skip=False) :return (str): App input string """ - prefix_str = ( # Set prefix as samplename - f'{SWConfig.STAGE_INPUTS["pipe"]["happy_prefix"]}{self.sample_dict["sample_name"]}' - ) + prefix_str = f'{SWConfig.STAGE_INPUTS["pipe"]["happy_prefix"]}{self.sample_dict["sample_name"]}' # Set prefix as samplename if self.sample_dict["pos_control"]: skip_str = f'{SWConfig.STAGE_INPUTS["pipe"]["happy_skip"]}false' else: @@ -618,9 +620,7 @@ def create_sompy_cmd(self, sample: str) -> str: :param sample (str): Sample name :return (str): Dx run command for sompy app """ - self.logger.info( - self.logger.log_msgs["building_cmd"], "sompy", sample - ) + self.logger.info(self.logger.log_msgs["building_cmd"], "sompy", sample) return " ".join( [ f'{SWConfig.DX_CMDS["sompy"]}Sompy-{sample}', @@ -646,22 +646,22 @@ def return_congenica_cmd(self) -> Union[str, None]: if any([self.sample_dict["neg_control"], self.sample_dict["pos_control"]]): decision_support_cmd = None self.logger.info( - self.logger.log_msgs[ - "decision_support_upload_notrequired" - ], + self.logger.log_msgs["decision_support_upload_notrequired"], self.sample_dict["sample_name"], ) else: self.logger.info( - self.logger.log_msgs[ - "decision_support_upload_required" - ], + self.logger.log_msgs["decision_support_upload_required"], self.sample_dict["sample_name"], ) # If project is specified then upload via upload agent - if self.sample_dict["panel_settings"]["congenica_project"] == "SFTP": # SFTP upload cmd. # TODO eventually remove this + if ( + self.sample_dict["panel_settings"]["congenica_project"] == "SFTP" + ): # SFTP upload cmd. # TODO eventually remove this decision_support_cmd = self.build_congenica_sftp_cmd() - elif isinstance(self.sample_dict["panel_settings"]["congenica_project"], int): + elif isinstance( + self.sample_dict["panel_settings"]["congenica_project"], int + ): decision_support_cmd = self.build_congenica_cmd() return decision_support_cmd @@ -695,7 +695,7 @@ def build_congenica_cmd(self) -> str: upload bash script. This command is used to upload the sample to Congenica using the standard Congenica upload app. Takes BAM and VCF inputs, along with config-specified inputs congenica project ID, credentials, IR template and sample name - :param pipeline (str): + :param pipeline (str): :return (str): Dx run command for the Congenica upload (standard Congenica upload app) """ self.logger.info( @@ -707,7 +707,9 @@ def build_congenica_cmd(self) -> str: vcf_input = f'{self.sample_dict["sample_name"]}*.bedfiltered.vcf.gz' bam_input = f'{self.sample_dict["sample_name"]}*.refined.bam' - if self.sample_dict["panel_settings"]["pipeline"] == "wes": # TODO eventually remove this + if ( + self.sample_dict["panel_settings"]["pipeline"] == "wes" + ): # TODO eventually remove this vcf_input = f'{self.sample_dict["sample_name"]}*_markdup_Haplotyper.vcf.gz' bam_input = f'{self.sample_dict["sample_name"]}*_markdup.bam' @@ -750,7 +752,9 @@ def build_qiagen_upload_cmd(self) -> str: ] ) - def build_oncodeep_upload_cmd(self, file_name: str, run_identifier: str, file: str) -> str: + def build_oncodeep_upload_cmd( + self, file_name: str, run_identifier: str, file: str + ) -> str: """ Build the command to write the OncoDEEP upload dx run command to the decision support tool upload bash script. This command is used to upload the sample to @@ -776,7 +780,11 @@ def return_rd_query(self) -> str: Create a query per sample using the DNA number :return query (str): Sample SQL rare disease query """ - pipeline_version = str(SWConfig.SQL_IDS["WORKFLOWS"][self.sample_dict["panel_settings"]["pipeline"]]) + pipeline_version = str( + SWConfig.SQL_IDS["WORKFLOWS"][ + self.sample_dict["panel_settings"]["pipeline"] + ] + ) rd_query = SWConfig.QUERIES["customrun"] % ( f"'{self.sample_dict['identifiers']['primary']}','{pipeline_version}'," f"'{self.runfolder_name}'" @@ -789,7 +797,11 @@ def return_oncology_query(self) -> str: These are recorded along with the pipeline version, run name, and panel ID. :return query (str): Sample SQL oncology query """ - pipeline_version = str(SWConfig.SQL_IDS["WORKFLOWS"][self.sample_dict["panel_settings"]["pipeline"]]) + pipeline_version = str( + SWConfig.SQL_IDS["WORKFLOWS"][ + self.sample_dict["panel_settings"]["pipeline"] + ] + ) panel_id = self.sample_dict["pannum"].replace("Pan", "") onc_query = SWConfig.QUERIES["oncology"] % ( @@ -797,4 +809,3 @@ def return_oncology_query(self) -> str: f"'{self.runfolder_name}','{pipeline_version}','{panel_id}'" ) return onc_query - diff --git a/setoff_workflows/pipeline_emails.py b/setoff_workflows/pipeline_emails.py index 5415c716..02c05b8a 100644 --- a/setoff_workflows/pipeline_emails.py +++ b/setoff_workflows/pipeline_emails.py @@ -5,6 +5,7 @@ - Pipeline started email. Contains SQL queries used to update the Moka database - Samples being processed email """ + import logging from config.ad_config import SWConfig from ad_email.ad_email import AdEmail @@ -38,7 +39,13 @@ class PipelineEmails(SWConfig): Construct and send the samples being processed email using AdEmail class """ - def __init__(self, rf_obj: RunfolderObject, rf_samples_obj: RunfolderSamples, sql_queries: str, logger: logging.Logger): + def __init__( + self, + rf_obj: RunfolderObject, + rf_samples_obj: RunfolderSamples, + sql_queries: str, + logger: logging.Logger, + ): """ Constructor for the PipelineEmails class. Calls the class methods """ diff --git a/setoff_workflows/setoff_workflows.py b/setoff_workflows/setoff_workflows.py index cf256c6d..15afc54b 100755 --- a/setoff_workflows/setoff_workflows.py +++ b/setoff_workflows/setoff_workflows.py @@ -29,6 +29,7 @@ - CustomPanelsPipeline Collate commands for Custom Panels workflow. This runtype has no postprocesing commands """ + import sys import os import re @@ -50,10 +51,12 @@ write_lines, execute_subprocess_command, get_samplename_dict, - validate_fastqs, ) from setoff_workflows.pipeline_emails import PipelineEmails -from setoff_workflows.build_dx_commands import BuildRunfolderDxCommands, BuildSampleDxCommands +from setoff_workflows.build_dx_commands import ( + BuildRunfolderDxCommands, + BuildSampleDxCommands, +) from toolbox.toolbox import script_start_logmsg, script_end_logmsg # Set up script logging @@ -159,7 +162,7 @@ def has_demultiplexed(self, rf_obj: object) -> Optional[bool]: if os.path.isfile(rf_obj.bcl2fastqlog_file): logfile_list = read_lines(rf_obj.bcl2fastqlog_file) completed_strs = [ - SWConfig.STRINGS["demultiplex_not_required_msg"].partition(' ')[-1], + SWConfig.STRINGS["demultiplex_not_required_msg"].partition(" ")[-1], SWConfig.STRINGS["demultiplex_success"], ] if logfile_list: @@ -284,7 +287,9 @@ def __init__(self, rf_obj: RunfolderObject, loggers: dict): self.rf_obj = rf_obj self.loggers = loggers self.dnanexus_auth = get_credential(SWConfig.CREDENTIALS["dnanexus_authtoken"]) - open(self.rf_obj.upload_flagfile, 'w').close() # Create upload flag file (prevents processing by other script runs) + open( + self.rf_obj.upload_flagfile, "w" + ).close() # Create upload flag file (prevents processing by other script runs) self.rf_samples_obj = RunfolderSamples(self.rf_obj, self.loggers["sw"]) self.users_dict = self.get_users_dict() self.write_project_creation_script() @@ -305,7 +310,12 @@ def __init__(self, rf_obj: RunfolderObject, loggers: dict): self.write_dx_run_cmds() self.pre_pipeline_upload() self.run_dx_run_commands() - self.pipeline_emails = PipelineEmails(self.rf_obj, self.rf_samples_obj, self.pipeline_obj.sql_queries, self.loggers["sw"]) + self.pipeline_emails = PipelineEmails( + self.rf_obj, + self.rf_samples_obj, + self.pipeline_obj.sql_queries, + self.loggers["sw"], + ) if self.pipeline_obj.sql_queries: self.pipeline_emails.send_sql_email() self.pipeline_emails.send_samples_email() @@ -613,23 +623,35 @@ def build_dx_commands(self) -> object: decision_support_upload_cmds and sql_queries as attributes """ if self.rf_samples_obj.pipeline == "tso500": - pipeline_obj = TsoPipeline(self.rf_obj, self.rf_samples_obj, self.loggers["sw"]) + pipeline_obj = TsoPipeline( + self.rf_obj, self.rf_samples_obj, self.loggers["sw"] + ) if self.rf_samples_obj.pipeline == "archerdx": - pipeline_obj = ArcherDxPipeline(self.rf_obj, self.rf_samples_obj, self.loggers["sw"]) + pipeline_obj = ArcherDxPipeline( + self.rf_obj, self.rf_samples_obj, self.loggers["sw"] + ) if self.rf_samples_obj.pipeline == "wes": - pipeline_obj = WesPipeline(self.rf_obj, self.rf_samples_obj, self.loggers["sw"]) + pipeline_obj = WesPipeline( + self.rf_obj, self.rf_samples_obj, self.loggers["sw"] + ) if self.rf_samples_obj.pipeline == "oncodeep": - pipeline_obj = OncoDeepPipeline(self.rf_obj, self.rf_samples_obj, self.loggers["sw"]) + pipeline_obj = OncoDeepPipeline( + self.rf_obj, self.rf_samples_obj, self.loggers["sw"] + ) if self.rf_samples_obj.pipeline == "snp": - pipeline_obj = SnpPipeline(self.rf_obj, self.rf_samples_obj, self.loggers["sw"]) + pipeline_obj = SnpPipeline( + self.rf_obj, self.rf_samples_obj, self.loggers["sw"] + ) if self.rf_samples_obj.pipeline == "pipe": - pipeline_obj = CustomPanelsPipeline(self.rf_obj, self.rf_samples_obj, self.loggers["sw"]) + pipeline_obj = CustomPanelsPipeline( + self.rf_obj, self.rf_samples_obj, self.loggers["sw"] + ) if self.rf_samples_obj.pipeline == "dev": - pipeline_obj = DevPipeline(self.rf_obj, self.rf_samples_obj, self.loggers["sw"]) + pipeline_obj = DevPipeline( + self.rf_obj, self.rf_samples_obj, self.loggers["sw"] + ) - self.loggers["sw"].info( - self.loggers["sw"].log_msgs["cmds_built"] - ) + self.loggers["sw"].info(self.loggers["sw"].log_msgs["cmds_built"]) return pipeline_obj def write_dx_run_cmds(self) -> None: @@ -777,7 +799,7 @@ def run_dx_run_commands(self) -> None: else: self.loggers["sw"].info( self.loggers["sw"].log_msgs["dx_run_success"], - self.rf_obj.runfolder_name + self.rf_obj.runfolder_name, ) def post_pipeline_upload(self) -> None: @@ -803,7 +825,7 @@ class DevPipeline: Collate DNAnexus commands for development runs. This runtype has no decision support upload or postprocessing commands, or SQL queries """ - + def __init__(self, rf_obj: object, rf_samples: object, logger: logging.Logger): """ Constructor for the DevPipeline class @@ -826,7 +848,10 @@ def __init__(self, rf_obj: object, rf_samples: object, logger: logging.Logger): self.workflow_cmds.append(SWConfig.UPLOAD_ARGS["depends_list"]) # Return downstream app commands - self.workflow_cmds.extend(self.rf_cmds_obj.return_multiqc_cmds(self.rf_samples_obj.pipeline)) + self.workflow_cmds.extend( + self.rf_cmds_obj.return_multiqc_cmds(self.rf_samples_obj.pipeline) + ) + class ArcherDxPipeline: """ @@ -854,15 +879,18 @@ def __init__(self, rf_obj: object, rf_samples: object, logger: logging.Logger): self.workflow_cmds.append(sample_cmds_obj.create_fastqc_cmd()) self.workflow_cmds.append(SWConfig.UPLOAD_ARGS["depends_list"]) - self.sql_queries.append(sample_cmds_obj.return_oncology_query()) # Get SQL queries + self.sql_queries.append( + sample_cmds_obj.return_oncology_query() + ) # Get SQL queries # Return downstream app commands - self.workflow_cmds.extend(self.rf_cmds_obj.return_multiqc_cmds(self.rf_samples_obj.pipeline)) + self.workflow_cmds.extend( + self.rf_cmds_obj.return_multiqc_cmds(self.rf_samples_obj.pipeline) + ) self.workflow_cmds.append(self.rf_cmds_obj.create_duty_csv_cmd()) class SnpPipeline: # TODO eventually remove this and associated pipeline-specific functions - """ Collate DNAnexus commands for SNP runs. This run type has no decision support upload or post processing commands @@ -871,6 +899,7 @@ class SnpPipeline: # TODO eventually remove this and associated pipeline-specif workflow_cmd (str): Dx run command for the sample workflow query (str): Sample-level SQL query """ + def __init__(self, rf_obj: object, rf_samples: object, logger: logging.Logger): self.rf_obj = rf_obj @@ -890,18 +919,23 @@ def __init__(self, rf_obj: object, rf_samples: object, logger: logging.Logger): self.workflow_cmds.append(sample_cmds_obj.create_snp_cmd()) self.workflow_cmds.append(SWConfig.UPLOAD_ARGS["depends_list"]) - self.sql_queries.append(sample_cmds_obj.return_rd_query()) # Get SQL queries + self.sql_queries.append( + sample_cmds_obj.return_rd_query() + ) # Get SQL queries # Return downstream app commands - self.workflow_cmds.extend(self.rf_cmds_obj.return_multiqc_cmds(self.rf_samples_obj.pipeline)) + self.workflow_cmds.extend( + self.rf_cmds_obj.return_multiqc_cmds(self.rf_samples_obj.pipeline) + ) self.workflow_cmds.append(self.rf_cmds_obj.create_duty_csv_cmd()) -class OncoDeepPipeline(): +class OncoDeepPipeline: """ Collate DNAnexus commands for OncoDEEP runs. This runtype has no post processing commands or decision support upload script, as the decision support commands are run automatically therefore reside in the dx run script """ + def __init__(self, rf_obj: object, rf_samples: object, logger: logging.Logger): self.rf_obj = rf_obj self.rf_samples_obj = rf_samples @@ -926,7 +960,9 @@ def __init__(self, rf_obj: object, rf_samples: object, logger: logging.Logger): sample_cmds_obj.build_oncodeep_upload_cmd( f"{sample_name}-{read}", self.rf_samples_obj.nexus_runfolder_suffix, - self.rf_samples_obj.samples_dict[sample_name]["fastqs"][read]["nexus_path"] + self.rf_samples_obj.samples_dict[sample_name]["fastqs"][read][ + "nexus_path" + ], ) ) # Generate command for MasterFile upload @@ -934,11 +970,13 @@ def __init__(self, rf_obj: object, rf_samples: object, logger: logging.Logger): sample_cmds_obj.build_oncodeep_upload_cmd( self.rf_obj.masterfile_name, self.rf_samples_obj.nexus_runfolder_suffix, - f"{self.rf_obj.DNANEXUS_PROJ_ID}:{self.rf_obj.masterfile_name}" + f"{self.rf_obj.DNANEXUS_PROJ_ID}:{self.rf_obj.masterfile_name}", ) ) # Return downstream app commands - self.workflow_cmds.extend(self.rf_cmds_obj.return_multiqc_cmds(self.rf_samples_obj.pipeline)) + self.workflow_cmds.extend( + self.rf_cmds_obj.return_multiqc_cmds(self.rf_samples_obj.pipeline) + ) self.workflow_cmds.append(self.rf_cmds_obj.create_duty_csv_cmd()) @@ -947,6 +985,7 @@ class TsoPipeline: Collate commands for TSO workflow. This runtype has postprocessing commands and decision support upload commands, and SQL queries """ + def __init__(self, rf_obj: object, rf_samples: object, logger: logging.Logger): self.rf_obj = rf_obj self.rf_samples_obj = rf_samples @@ -962,7 +1001,7 @@ def __init__(self, rf_obj: object, rf_samples: object, logger: logging.Logger): # Exclude base SampleSheet as we only want to use split SampleSheets if tso_ss != self.rf_obj.samplesheet_name: self.workflow_cmds.append(self.rf_cmds_obj.create_tso500_cmd(tso_ss)) - + # Create per-sample commands for sample_name in self.rf_samples_obj.samples_dict.keys(): sample_cmds_obj = BuildSampleDxCommands( @@ -987,25 +1026,36 @@ def __init__(self, rf_obj: object, rf_samples: object, logger: logging.Logger): self.dx_postprocessing_cmds.append(SWConfig.UPLOAD_ARGS["depends_list"]) if self.rf_samples_obj.samples_dict[sample_name]["pos_control"]: - self.dx_postprocessing_cmds.append(sample_cmds_obj.create_sompy_cmd(sample_name)) + self.dx_postprocessing_cmds.append( + sample_cmds_obj.create_sompy_cmd(sample_name) + ) # Only add to depends_list if job ID from previous command # is not empty self.dx_postprocessing_cmds.append(SWConfig.UPLOAD_ARGS["depends_list"]) - if not self.rf_samples_obj.samples_dict[sample_name]["neg_control"] or self.rf_samples_obj.samples_dict[sample_name]["neg_control"]: - self.decision_support_upload_cmds.append(sample_cmds_obj.build_qiagen_upload_cmd()) # Build decision support upload commands - - self.sql_queries.append(sample_cmds_obj.return_oncology_query()) # Build SQL query + if ( + not self.rf_samples_obj.samples_dict[sample_name]["neg_control"] + or self.rf_samples_obj.samples_dict[sample_name]["neg_control"] + ): + self.decision_support_upload_cmds.append( + sample_cmds_obj.build_qiagen_upload_cmd() + ) # Build decision support upload commands + + self.sql_queries.append( + sample_cmds_obj.return_oncology_query() + ) # Build SQL query - self.dx_postprocessing_cmds.extend(self.rf_cmds_obj.return_multiqc_cmds(self.rf_samples_obj.pipeline)) + self.dx_postprocessing_cmds.extend( + self.rf_cmds_obj.return_multiqc_cmds(self.rf_samples_obj.pipeline) + ) self.dx_postprocessing_cmds.append(self.rf_cmds_obj.create_duty_csv_cmd()) class WesPipeline: # TODO eventually remove this and associated pipeline-specific functions - """ Collate commands for WES workflow. This runtype has no postprocesing commands """ + def __init__(self, rf_obj: object, rf_samples: object, logger: logging.Logger): self.rf_obj = rf_obj @@ -1023,16 +1073,25 @@ def __init__(self, rf_obj: object, rf_samples: object, logger: logging.Logger): self.rf_samples_obj.samples_dict[sample_name], self.logger, ) - self.workflow_cmds.extend([sample_cmds_obj.create_wes_cmd(), SWConfig.UPLOAD_ARGS["depends_list"]]) - - self.decision_support_upload_cmds.append(sample_cmds_obj.return_congenica_cmd()) - self.sql_queries.append(sample_cmds_obj.return_rd_query()) # Get SQL queries + self.workflow_cmds.extend( + [sample_cmds_obj.create_wes_cmd(), SWConfig.UPLOAD_ARGS["depends_list"]] + ) + self.decision_support_upload_cmds.append( + sample_cmds_obj.return_congenica_cmd() + ) + self.sql_queries.append( + sample_cmds_obj.return_rd_query() + ) # Get SQL queries - self.workflow_cmds.extend([self.rf_cmds_obj.create_peddy_cmd(), SWConfig.UPLOAD_ARGS["depends_list"]]) + self.workflow_cmds.extend( + [self.rf_cmds_obj.create_peddy_cmd(), SWConfig.UPLOAD_ARGS["depends_list"]] + ) # Return downstream app commands - self.workflow_cmds.extend(self.rf_cmds_obj.return_multiqc_cmds(self.rf_samples_obj.pipeline)) + self.workflow_cmds.extend( + self.rf_cmds_obj.return_multiqc_cmds(self.rf_samples_obj.pipeline) + ) self.workflow_cmds.append(self.rf_cmds_obj.create_duty_csv_cmd()) wes_dnanumbers = [ @@ -1047,9 +1106,9 @@ class CustomPanelsPipeline: Collate commands for Custom Panels workflow. This runtype has no postprocesing commands """ + def __init__(self, rf_obj: object, rf_samples: object, logger: logging.Logger): - """ - """ + """ """ self.rf_obj = rf_obj self.rf_samples_obj = rf_samples self.logger = logger @@ -1068,11 +1127,19 @@ def __init__(self, rf_obj: object, rf_samples: object, logger: logging.Logger): # Add to gatk depends list because RPKM / ExomeDepth must depend only upon the # sample workflows completing successfully, whilst other downstream # apps depend on all prior jobs completing succesfully - self.workflow_cmds.extend([sample_cmds_obj.create_pipe_cmd(), SWConfig.UPLOAD_ARGS["depends_list"], SWConfig.UPLOAD_ARGS["depends_list_gatk"]]) + self.workflow_cmds.extend( + [ + sample_cmds_obj.create_pipe_cmd(), + SWConfig.UPLOAD_ARGS["depends_list"], + SWConfig.UPLOAD_ARGS["depends_list_gatk"], + ] + ) self.sql_queries.append(sample_cmds_obj.return_rd_query()) - self.decision_support_upload_cmds.append(sample_cmds_obj.return_congenica_cmd()) - + self.decision_support_upload_cmds.append( + sample_cmds_obj.return_congenica_cmd() + ) + # CNV calling steps are a dependency of MultiQC cmd_list = [] for core_panel in ["vcp1", "vcp2", "vcp3"]: @@ -1085,13 +1152,25 @@ def __init__(self, rf_obj: object, rf_samples: object, logger: logging.Logger): core_panel_pannos = [ self.rf_samples_obj.samples_dict[k]["pannum"] for k, v in self.rf_samples_obj.samples_dict.items() - if self.rf_samples_obj.samples_dict[k]["panel_settings"]["panel_name"] + if self.rf_samples_obj.samples_dict[k]["panel_settings"][ + "panel_name" + ] == core_panel ] # Make sure there are enough samples for RPKM and ExomeDepth if len(core_panel_pannos) >= 3: - self.workflow_cmds.extend([self.rf_cmds_obj.create_rpkm_cmd(core_panel), SWConfig.UPLOAD_ARGS["depends_list_cnvcalling"]]) - self.workflow_cmds.extend([self.rf_cmds_obj.create_ed_readcount_cmd(core_panel), SWConfig.UPLOAD_ARGS["depends_list_edreadcount"]]) + self.workflow_cmds.extend( + [ + self.rf_cmds_obj.create_rpkm_cmd(core_panel), + SWConfig.UPLOAD_ARGS["depends_list_cnvcalling"], + ] + ) + self.workflow_cmds.extend( + [ + self.rf_cmds_obj.create_ed_readcount_cmd(core_panel), + SWConfig.UPLOAD_ARGS["depends_list_edreadcount"], + ] + ) for panno in set(core_panel_pannos): if ( SWConfig.CAPTURE_PANEL_DICT[core_panel][ @@ -1099,17 +1178,22 @@ def __init__(self, rf_obj: object, rf_samples: object, logger: logging.Logger): ] and SWConfig.PANEL_DICT[panno]["ed_cnvcalling_bedfile"] ): - self.workflow_cmds.extend([self.rf_cmds_obj.create_ed_cnvcalling_cmd(panno), SWConfig.UPLOAD_ARGS["depends_list_cnvcalling"]]) + self.workflow_cmds.extend( + [ + self.rf_cmds_obj.create_ed_cnvcalling_cmd(panno), + SWConfig.UPLOAD_ARGS["depends_list_cnvcalling"], + ] + ) else: self.logger.info( - self.logger.log_msgs[ - "insufficient_samples_for_cnv" - ], + self.logger.log_msgs["insufficient_samples_for_cnv"], core_panel, ) self.workflow_cmds.append(SWConfig.UPLOAD_ARGS["depends_list_gatk_recombined"]) - self.workflow_cmds.extend(self.rf_cmds_obj.return_multiqc_cmds(self.rf_samples_obj.pipeline)) + self.workflow_cmds.extend( + self.rf_cmds_obj.return_multiqc_cmds(self.rf_samples_obj.pipeline) + ) # We want duty_csv to also depend on the cnv calling jobs for PIPE workflows self.workflow_cmds.append(SWConfig.UPLOAD_ARGS["depends_list_cnv_recombined"]) diff --git a/setoff_workflows/test_setoff_workflows.py b/setoff_workflows/test_setoff_workflows.py new file mode 100755 index 00000000..3d452509 --- /dev/null +++ b/setoff_workflows/test_setoff_workflows.py @@ -0,0 +1,4 @@ +""" setoff_workflows.py pytest unit tests. Test suite is incomplete +""" + +# TODO finish this test suite as it is currently incomplete diff --git a/test/README.md b/test/README.md deleted file mode 100755 index 6fb6babd..00000000 --- a/test/README.md +++ /dev/null @@ -1,70 +0,0 @@ -# Testing module - -This module contains pytest scripts for testing the various modules within this repository. It also contains the data that is used for these test cases. - -* [automate_demultiplexing_logfiles](data/automate_demultiplexing_logfiles/) - this contains subdirectories for storing logfiles created during testing (these folders are copied over to a temporary directory at the start of each test) -* [demultiplex_test_files](data/demultiplex_test_files/) - test cases used by test_demultiplex.py -* [samplesheets](data/samplesheets) - SampleSheet test cases, used by test_samplesheet_validator.py and test_demultiplex.py - -## Running the tests - -Tests can be executed using the following command. It is important to include the ignore flag to prevent pytest from scanning for tests through all test files, which slows down the tests considerably - -```bash -python3 -m pytest -``` - -## Demultiplex.py tests - -This directory contains test files used in the demultiplex test suite. - - - -test_runfolders contains runfolders used to test GetRunfolders().rundemultiplexrunfolders(), -DemultiplexRunfolder.run_demultiplexing() and DemultiplexRunfolder.check_demultiplexing_required(), -and GetRunfolders.loop_through_runs(). - -The test cases are described below. - - -### Test SampleSheets - -Lone SampleSheet test cases are detailed below. These have been created for the purpose of testing SampleSheet related functions in the demultiplex script (valid_samplesheet and no_disallowed_sserrs). The test cases are as follows: - -#### [Valid SampleSheets](data/samplesheets/valid) - -| SampleSheet name | Run Type | -| ---- | -------- | -| 210408_M02631_0186_000000000-JFMNK_SampleSheet.csv | SNP | # DONE -| 210917_NB551068_0409_AH3YNFAFX3_SampleSheet.csv | Custom Panel | # DONE -| 221021_A01229_0145_BHGGTHDMXY_SampleSheet.csv | TSO500 | # DONE -| 221024_A01229_0146_BHKGG2DRX2_SampleSheet.csv | WES Skin | # DONE - -#### [Invalid SampleSheets](data/samplesheets/invalid/) -# TODO check if these cover all cases - -| SampleSheet Name | Details | Expected behaviour | -| ---- | ------- | ------------------ | -| 21aA08_A01229_0040_AHKGTFDRXY_SampleSheet.csv | Empty SampleSheet with invalid name (letter in date) | -| 21108_A01229_0040_AHKGTFDRXY_SampleSheet.csv | Empty SampleSheet with invalid name (date too short) | -| 220413_A01229_0032_AHGKBIEKFR_SampleSheet.csv | Empty SampleSheet | -| 200817_NB068_0009_AH3YERAFX3_SampleSheet.csv | Custom Panel SampleSheet with invalid name (invalid sequencer ID), invalid contents (invalid header, invalid sample names, non-matching sample names, invalid pan number, invalid runtype) | # DONE -| 210513_M02631_0236_000000000-JFMNK_SampleSheet.csv | SNP SampleSheet with invalid characters in the sample name | -| 220404_B01229_0348_HFGIFEIOPY_SampleSheet.csv | TSO SampleSheet with invalid name (invalid sequencer ID), invalid contents (invalid header, invalid sample names, non-matching sample names, invalid pan number, invalid runtype) | # DONE -| 220408_A02631_0186_000000000-JLJFE_SampleSheet.csv | SNP SampleSheet with invalid contents (invalid header, invalid sample names, non-matching sample names, invalid pan number, invalid runtype) | # DONE -| 2110915_M02353_0632_000000000-K242J_SampleSheet.csv | SNP SampleSheet with invalid name (date too long), invalid contents (invalid header, invalid sample names, non-matching sample names, invalid pan number, invalid runtype) | # DONE - -### test_runfolders -| Runfolder | Details | Expected behaviour | -| ----------------------------- | ------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------ | ----------------------------------------------------------------------------------------------------------- | -| 999999_A01229_0000_00000TEST1 | bcl2fastq2_output.log (Demultiplexing already complete) | demultiplexing_requried returns False | -| 999999_A01229_0000_00000TEST2 | No flag files (Sequencing not finished) | demultiplexing_requried returns False | -| 999999_A01229_0000_00000TEST3 | RTAComplete.txt, invalid SampleSheet present in test samplesheet dir with disallowed errors that would cause demultiplexing to fail (Sequencing complete but no processing has taken place yet) | demultiplexing_requried returns False | -| 999999_M02631_0000_00000TEST4 | RTAComplete.txt, matching valid SampleSheet present in test samplesheet dir, InterOp and RunInfo.xml files for Picard CollectIlluminaLaneMetrics calculation, integrity check not required (Sequencing complete but no processing has taken place yet) | demultiplexing_requried returns True | -| 999999_A01229_0000_00000TEST5 | RTAComplete.txt, matching valid SampleSheet present in test samplesheet dir, integrity check required, but no checksum file (Sequencing complete but no processing has taken place yet) | demultiplexing_requried returns False | -| 999999_A01229_0000_00000TEST6 | RTAComplete.txt, matching valid SampleSheet present in test samplesheet dir, integrity check required, md5checksum.txt present and contains integrity check fail string (Sequencing complete but no processing has taken place yet, previous integrity check has failed) | demultiplexing_requried returns False | -| 999999_A01229_0000_00000TEST7 | RTAComplete.txt (sequencing complete) , matching valid SampleSheet present in test samplesheet dir, InterOp and RunInfo.xml files for Picard CollectIlluminaLaneMetrics calculation, integrity check required, md5checksum.txt present and contains matching checksums but no previously checked checksums string (Sequencing complete but no processing has taken place yet, integrity check passed) | demultiplexing_required returns True | -| 999999_A01229_0000_00000TEST8 | Matching valid SampleSheet present in samplesheet dir containing TSO samples | run_demultiplexing returns False, self.run_processed == True | -| 999999_A01229_0000_00000TEST9 | RTAComplee.txt (sequencing complete), Matching valid SampleSheet present in samplesheet dir containing non-TSO | run_demultiplexing returns True, self.run_processed == True (bcl2fastq2 command replaced by a dummy command) | -| 999999_A01229_0000_0000TEST10 | RTAComplete.txt (sequencing complete), SampleSheet missing, integrity check not required (md5checksum.txt present and contains matching checksums with a previously checked checksums string - processing has taken place, integrity check passed) | demultiplexing_required returns False | -| 999999_A01229_0000_0000TEST11 | RTAComplete.txt, SampleSheet present and contains TSO samples, integrity check required (md5checksum.txt present and contains matching checksums but no previously checked checksums string - no processing has taken place) | | demultiplexing_required returns True | diff --git a/test/test_setoff_workflows.py b/test/test_setoff_workflows.py deleted file mode 100755 index e69de29b..00000000 diff --git a/test/test_toolbox.py b/toolbox/test_toolbox.py similarity index 100% rename from test/test_toolbox.py rename to toolbox/test_toolbox.py diff --git a/toolbox/toolbox.py b/toolbox/toolbox.py index cf8ed255..6ad0ede5 100755 --- a/toolbox/toolbox.py +++ b/toolbox/toolbox.py @@ -10,12 +10,15 @@ - SampleObject Collect sample-specific attributes for a sample """ + import sys import os import re import subprocess import logging -import datetime +import time +import seglh_naming +from pathlib import Path from typing import Tuple from distutils.spawn import find_executable from typing import Union, Optional @@ -75,6 +78,11 @@ def return_scriptlog_config() -> dict: "sw_script_logfiles", f"{ToolboxConfig.TIMESTAMP}_setoff_workflow.log", ), + "wscleaner": os.path.join( # Record wscleaner script logs + ToolboxConfig.AD_LOGDIR, + "wscleaner_logs", + f"{ToolboxConfig.TIMESTAMP}_wscleaner.log", + ), } @@ -107,6 +115,7 @@ def script_end_logmsg(logger: logging.Logger, file: str) -> None: os.path.basename(os.path.dirname(file)), ) + def git_tag() -> str: """ Obtain the git tag of the current commit @@ -261,7 +270,9 @@ def get_num_processed_runfolders( return num_processed_runfolders -def get_samplename_dict(logger: logging.Logger, samplesheet_path: str) -> Optional[dict]: +def get_samplename_dict( + logger: logging.Logger, samplesheet_path: str +) -> Optional[dict]: """ Read SampleSheet to create a dict of samples and their pan numbers for the run. Reads file into list and loops through in reverse allowing us to access @@ -302,9 +313,7 @@ def validate_fastqs(fastq_dir_path: str, logger: logging.Logger) -> Optional[boo :param logger (logging.Logger): Logger :return Optional[bool]: Return True if fastqs are all determined to be valid """ - fastqs = sorted( - [x for x in os.listdir(fastq_dir_path) if x.endswith("fastq.gz")] - ) + fastqs = sorted([x for x in os.listdir(fastq_dir_path) if x.endswith("fastq.gz")]) returncodes = [] for fastq in fastqs: @@ -363,7 +372,7 @@ class RunfolderObject(ToolboxConfig): samplesheet_validator_logfile (str): SampleSheet validator script logfile (within logfiles dir) logfiles_config (dict): Contains all runfolder log files logfiles_to_upload (list): All logfiles that require upload to DNAnexus - + Methods get_runfolder_loggers(script) Return dictionary of logger.Logging objects for the runfolder @@ -489,8 +498,15 @@ def __init__(self, runfolder_name: str, timestamp: str): self.upload_runfolder_logfile, ] + def age(self) -> int: + """ + Return runfolder age in days + :return age (int): Runfolder age in days + """ + return (time.time() - Path(self.runfolderpath).stat().st_mtime) // (24 * 3600) + def get_runfolder_loggers(self, script: str) -> dict: - """ + """ Return dictionary of logger.Logging objects for the runfolder :param script (str): Script name the function has been called from :return (dict): Dictionary of logger.Logging objects @@ -527,7 +543,7 @@ class RunfolderSamples(ToolboxConfig): undetermined_fastqs_list (list): List of all undetermined fastqs in the run undetermined_fastqs_str (str) Space separated string of all undetermined fastqs in the run, with each fastq encased in quotation marks - + Methods get_pipeline() Use samplename_dict and the ToolboxConfig.PANEL_DICT to get the pipeline name for @@ -543,6 +559,8 @@ class RunfolderSamples(ToolboxConfig): Parse the names in self.samplename_dict to identify the WES batch numbers get_nexus_paths() Build nexus paths, using NGS run numbers (and batch numbers in the case of WES) + get_unique_pannos() + Return set of unique pan numbers for samples within the run get_samples_dict() Create a SampleObject per sample, containing sample-specific properties, and add each SampleObject to a larger samples_dict @@ -558,6 +576,7 @@ class RunfolderSamples(ToolboxConfig): get_undetermined_fastqs_list() Return a list of undetermined fastqs for the run """ + def __init__(self, rf_obj: object, logger: logging.Logger): """ Constructor for the RunfolderSamples class @@ -573,7 +592,7 @@ def __init__(self, rf_obj: object, logger: logging.Logger): self.runtype_str = self.get_runtype() self.nexus_runfolder_suffix = self.get_nexus_runfolder_suffix() self.nexus_paths = self.get_nexus_paths() - self.unique_pannos = set(self.samplename_dict.values()) + self.unique_pannos = self.get_unique_pannos() self.samples_dict = self.get_samples_dict() self.check_for_missing_fastqs() self.fastqs_list = self.get_fastqs_list() @@ -581,7 +600,7 @@ def __init__(self, rf_obj: object, logger: logging.Logger): self.undetermined_fastqs_list = self.get_undetermined_fastqs_list() self.undetermined_fastqs_str = self.get_fastqs_str( self.undetermined_fastqs_list - ) + ) def get_pipeline(self) -> Optional[str]: """ @@ -590,23 +609,27 @@ def get_pipeline(self) -> Optional[str]: pipeline name in the list. Returns the most frequent pipeline name in the set :return pipeline_name (Optional[str]): Pipeline name if only one pipeline name in list """ - pipelines_list = [] - for sample, panno in self.samplename_dict.items(): - pipelines_list.append(ToolboxConfig.PANEL_DICT[panno]["pipeline"]) - pipelines_list = sorted(list(set(pipelines_list))) - if len(pipelines_list) > 1: - self.logger.error( - self.logger.log_msgs["multiple_pipeline_names"], - pipelines_list, - ToolboxConfig.PIPELINES, - ) - else: - pipeline_name = pipelines_list[0] # Get pipeline from pipelines_list - self.logger.info( - self.logger.log_msgs["pipeline_name"], - pipeline_name, - ) - return pipeline_name + if self.samplename_dict: + try: + pipelines_list = [] + for sample, panno in self.samplename_dict.items(): + pipelines_list.append(ToolboxConfig.PANEL_DICT[panno]["pipeline"]) + pipelines_list = sorted(list(set(pipelines_list))) + if len(pipelines_list) > 1: + self.logger.error( + self.logger.log_msgs["multiple_pipeline_names"], + pipelines_list, + ToolboxConfig.PIPELINES, + ) + else: + pipeline_name = pipelines_list[0] # Get pipeline from pipelines_list + self.logger.debug( + self.logger.log_msgs["pipeline_name"], + pipeline_name, + ) + return pipeline_name + except Exception: + return None def get_runtype(self) -> str: """ @@ -614,15 +637,22 @@ def get_runtype(self) -> str: in the run for Custom Panels and WES runs where sample types vary (VCP1/2/3/WES/WES EB) :return runtype_str (str): Runtype name string """ - runtype_list = [] - for sample, panno in self.samplename_dict.items(): - runtype_list.append(ToolboxConfig.PANEL_DICT[panno]["runtype"]) - runtype_str = "_".join(sorted(list(set(runtype_list)))) - self.logger.info( - self.logger.log_msgs["runtype_str"], - runtype_str, - ) - return runtype_str + if self.samplename_dict: + try: + runtype_list = [] + for sample, panno in self.samplename_dict.items(): + runtype_list.append(ToolboxConfig.PANEL_DICT[panno]["runtype"]) + if ToolboxConfig.PANEL_DICT[panno]["sample_prefix"]: + if all(ToolboxConfig.PANEL_DICT[panno]["sample_prefix"] not in runtype for runtype in runtype_list): + runtype_list.append(ToolboxConfig.PANEL_DICT[panno]["sample_prefix"]) + runtype_str = "_".join(sorted(list(set(runtype_list)))) + self.logger.debug( + self.logger.log_msgs["runtype_str"], + runtype_str, + ) + return runtype_str + except Exception: + return None def get_nexus_runfolder_suffix(self) -> str: """ @@ -632,16 +662,17 @@ def get_nexus_runfolder_suffix(self) -> str: :return suffix (str): String of '_' delimited unique library numbers, and WES batch numbers if run is a WES run, followed by the runtype """ - library_numbers = self.capture_library_numbers() + if self.samplename_dict: + library_numbers = self.capture_library_numbers() + + if self.pipeline == "wes": + library_numbers.extend(self.capture_wes_batch_numbers()) - if self.pipeline == "wes": - library_numbers.extend(self.capture_wes_batch_numbers()) + if self.pipeline in ["pipe", "wes", "dev"]: + library_numbers.append(self.runtype_str) - if self.pipeline in ["pipe", "wes", "dev"]: - library_numbers.append(self.runtype_str) - - suffix = f"{'_'.join(library_numbers)}" # Provides more detail on contents of runs in runfolder name - return suffix + suffix = f"{'_'.join(library_numbers)}" # Provides more detail on contents of runs in runfolder name + return suffix def capture_library_numbers(self) -> list: """ @@ -658,15 +689,13 @@ def capture_library_numbers(self) -> list: # Split on underscores to capture library number e.g. ONC100 or NGS100 library_numbers.append(samplename.split("_")[0]) if library_numbers: # Should always be library numbers found - self.logger.info( + self.logger.debug( self.logger.log_msgs["library_nos_identified"], ", ".join(sorted(list(set(library_numbers)))), ) return sorted(list(set(library_numbers))) else: # Prompt a slack alert - self.logger.error( - self.logger.log_msgs["library_no_err"] - ) + self.logger.error(self.logger.log_msgs["library_no_err"]) sys.exit(1) def capture_wes_batch_numbers(self) -> list: @@ -684,15 +713,13 @@ def capture_wes_batch_numbers(self) -> list: wesbatch = re.search(r"WES_?\d+", samplename).group() wes_batch_numbers_list.append(wesbatch.replace("_", "")) if wes_batch_numbers_list: - self.logger.info( + self.logger.debug( self.logger.log_msgs["wes_batch_nos_identified"], ", ".join(wes_batch_numbers_list), ) return sorted(list(set(wes_batch_numbers_list))) else: # Prompt a slack alert - self.logger.error( - self.logger.log_msgs["wes_batch_nos_missing"] - ) + self.logger.error(self.logger.log_msgs["wes_batch_nos_missing"]) sys.exit(1) def get_nexus_paths(self) -> dict: @@ -722,6 +749,14 @@ def get_nexus_paths(self) -> dict: ) return nexus_paths + def get_unique_pannos(self) -> Optional[list]: + """ + Return set of unique pan numbers for samples within the run + :return Optional[list]: List of unique pan numbers if samples identified, else None + """ + if self.samplename_dict: + return set(self.samplename_dict.values()) + def get_samples_dict(self) -> dict: """ Create a SampleObject for each sample which returns a sample dictionary @@ -730,24 +765,25 @@ def get_samples_dict(self) -> dict: :return samples_dict (dict): Dictionary of SampleObject per sample, containing sample-specific attributes """ - samples_dict = {} - for sample_name in self.samplename_dict.keys(): - self.sample_obj = SampleObject( - sample_name, - self.pipeline, - self.logger, - self.fastq_dir_path, - self.nexus_paths, - self.nexus_runfolder_suffix, - ) - if self.sample_obj.fastqs_dict: - samples_dict[sample_name] = self.sample_obj.return_sample_dict() - else: - self.logger.warning( - self.logger.log_msgs["sample_excluded"], + if self.samplename_dict: + samples_dict = {} + for sample_name in self.samplename_dict.keys(): + self.sample_obj = SampleObject( sample_name, + self.pipeline, + self.logger, + self.fastq_dir_path, + self.nexus_paths, + self.nexus_runfolder_suffix, ) - return samples_dict + if self.sample_obj.fastqs_dict: + samples_dict[sample_name] = self.sample_obj.return_sample_dict() + else: + self.logger.warning( + self.logger.log_msgs["sample_excluded"], + sample_name, + ) + return samples_dict def check_for_missing_fastqs(self) -> None: """ @@ -757,71 +793,64 @@ def check_for_missing_fastqs(self) -> None: the samples_dict so that they are processed :return None: """ - missing_samples = [] - for fastq_dir_file in os.listdir(self.fastq_dir_path): - if os.path.isfile(fastq_dir_file): - if fastq_dir_file.endswith("fastq.gz"): - self.logger.info( - self.logger.log_msgs["checking_fastq"], - fastq_dir_file, - ) - if self.fastq_not_undetermined( - fastq_dir_file - ): # Exclude undetermined - try: - seglh_namingSample.from_string(fastq_dir_file) - sample_name = [ - sample_name - for sample_name in self.samplename_dict.keys() - if sample_name in fastq_dir_file - ] - if sample_name: - self.logger.info( - self.logger.log_msgs[ - "sample_match" - ], - fastq_dir_file, - sample_name, - ) - else: + if self.samplename_dict: + missing_samples = [] + for fastq_dir_file in os.listdir(self.fastq_dir_path): + if os.path.isfile(fastq_dir_file): + if fastq_dir_file.endswith("fastq.gz"): + self.logger.info( + self.logger.log_msgs["checking_fastq"], + fastq_dir_file, + ) + if self.fastq_not_undetermined( + fastq_dir_file + ): # Exclude undetermined + try: + seglh_naming.Sample.from_string(fastq_dir_file) + sample_name = [ + sample_name + for sample_name in self.samplename_dict.keys() + if sample_name in fastq_dir_file + ] + if sample_name: + self.logger.info( + self.logger.log_msgs["sample_match"], + fastq_dir_file, + sample_name, + ) + else: + self.logger.error( + self.logger.log_msgs["sample_mismatch"], + fastq_dir_file, + ) + sample_name = re.sub( + "R[0-9]_001.fastq.gz", "", fastq_dir_file + ) + missing_samples.append(fastq_dir_file) + except ValueError as exception: self.logger.error( - self.logger.log_msgs[ - "sample_mismatch" - ], + self.logger.log_msgs["fastq_wrong_naming"], fastq_dir_file, + exception, ) - sample_name = re.sub( - "R[0-9]_001.fastq.gz", "", fastq_dir_file - ) - missing_samples.append(fastq_dir_file) - except ValueError as exception: - self.logger.error( - self.logger.log_msgs[ - "fastq_wrong_naming" - ], - fastq_dir_file, - exception, - ) - else: - self.logger.info( - self.logger.log_msgs["not_fastq"], - fastq_dir_file, - ) - for sample_name in missing_samples: # Add the sample to the sample_obj - # Strip end off sample name - sample_name = re.sub(r"_S[0-9]+_R[1-2]{1}_001.fastq.gz", "", sample_name) - self.logger.info( - self.logger.log_msgs["add_missing_sample"], sample_name - ) - self.sample_obj = SampleObject( - sample_name, - self.pipeline, - logger, - self.fastq_dir_path, - self.nexus_paths, - self.nexus_runfolder_suffix, - ) - self.samples_dict[sample_name] = self.sample_obj.return_sample_dict() + else: + self.logger.info( + self.logger.log_msgs["not_fastq"], + fastq_dir_file, + ) + for sample_name in missing_samples: # Add the sample to the sample_obj + # Strip end off sample name + sample_name = re.sub(r"_S[0-9]+_R[1-2]{1}_001.fastq.gz", "", sample_name) + self.logger.info(self.logger.log_msgs["add_missing_sample"], sample_name) + self.sample_obj = SampleObject( + sample_name, + self.pipeline, + self.logger, + self.fastq_dir_path, + self.nexus_paths, + self.nexus_runfolder_suffix, + ) + self.samples_dict[sample_name] = self.sample_obj.return_sample_dict() def fastq_not_undetermined(self, fastq_dir_file: str) -> Optional[bool]: """ @@ -842,18 +871,19 @@ def get_fastqs_list(self) -> list: Return a list of sample fastqs for the run :return fastqs_list (list): List of all sample fastqs in the run """ - fastqs_list = [] - for sample_name in self.samples_dict.keys(): - if self.samples_dict[sample_name]["fastqs"]: - fastqs_list.extend( - [ - self.samples_dict[sample_name]["fastqs"][read]["path"] - for read, path in self.samples_dict[sample_name][ - "fastqs" - ].items() - ] - ) - return fastqs_list + if self.samples_dict: + fastqs_list = [] + for sample_name in self.samples_dict.keys(): + if self.samples_dict[sample_name]["fastqs"]: + fastqs_list.extend( + [ + self.samples_dict[sample_name]["fastqs"][read]["path"] + for read, path in self.samples_dict[sample_name][ + "fastqs" + ].items() + ] + ) + return fastqs_list def get_fastqs_str(self, fastqs_list: list) -> str: """ @@ -863,11 +893,12 @@ def get_fastqs_str(self, fastqs_list: list) -> str: :return (str): Space separated string of fastqs with each fastq encased in quotation marks """ - quotation_marked_list = [] - for fastq in fastqs_list: - quotation_marked = f"'{fastq}'" - quotation_marked_list.append(quotation_marked) - return " ".join(quotation_marked_list) + if fastqs_list: + quotation_marked_list = [] + for fastq in fastqs_list: + quotation_marked = f"'{fastq}'" + quotation_marked_list.append(quotation_marked) + return " ".join(quotation_marked_list) def get_undetermined_fastqs_list(self) -> list: """ @@ -883,7 +914,6 @@ def get_undetermined_fastqs_list(self) -> list: return undetermined_fastqs_list - # TODO eventually adapt this class to use the SamplesheetValidator package class SampleObject(ToolboxConfig): """ @@ -914,6 +944,8 @@ class SampleObject(ToolboxConfig): Determine whether sample contains the control identifier strings find_pannum() Extract panel number from sample name using regular expression + return_panel_settings() + Return panel settings for the specified pan number, if exists validate_pannum(pannum) Check whether pan number is valid get_identifiers() @@ -959,12 +991,7 @@ def __init__( self.neg_control = self.check_control(ToolboxConfig.NTCON_IDS, "Negative") self.pos_control = self.check_control(ToolboxConfig.PSCON_IDS, "Positive") self.pannum = self.find_pannum() - self.panel_settings = ToolboxConfig.PANEL_DICT[self.pannum] - self.logger.info( - self.logger.log_msgs["sample_identified"], - self.panel_settings["panel_name"], - self.sample_name, - ) + self.panel_settings = self.return_panel_settings() self.primary_identifier, self.secondary_identifier = self.get_identifiers() self.fastqs_dict = self.get_fastqs_dict() @@ -976,7 +1003,7 @@ def check_control(self, identifiers: list, control_type: str) -> Optional[bool]: :return (Optional[bool]): True if sample contains any specified identifier, else False """ if any(identifier in self.sample_name for identifier in identifiers): - self.logger.info( + self.logger.debug( self.logger.log_msgs["control_sample"], control_type, self.sample_name, @@ -989,9 +1016,29 @@ def find_pannum(self) -> Optional[str]: :return pannum (Optional[str]): Panel number that matches a config-defined panel number, or None if pannum not valid """ - pannum = str(re.search(r"Pan\d+", self.sample_name).group()) - if self.validate_pannum(pannum): - return pannum + # print(ToolboxConfig.PANEL_DICT[pannum]["panel_name"]) + try: + pannum = str(re.search(r"Pan\d+", self.sample_name).group()).strip() + if self.validate_pannum(pannum): + self.logger.debug( + self.logger.log_msgs["sample_identified"], + ToolboxConfig.PANEL_DICT[pannum]["panel_name"], + self.sample_name, + ) + return pannum + except: + self.logger.error( + self.logger.log_msgs["missing_panno"], + self.sample_name, + ) + + def return_panel_settings(self) -> Optional[dict]: + """ + Return panel settings for the specified pan number, if exists + :return Optional[dict]: Return dictionary containing panel settings for the Pan number + """ + if self.pannum: + return ToolboxConfig.PANEL_DICT[self.pannum] def validate_pannum(self, pannum: int) -> bool: """ @@ -999,7 +1046,7 @@ def validate_pannum(self, pannum: int) -> bool: :return bool: True if pan number is valid, else None """ if str(pannum) in ToolboxConfig.PANELS: - self.logger.info( + self.logger.debug( self.logger.log_msgs["recognised_panno"], self.sample_name, pannum, @@ -1084,7 +1131,7 @@ def get_fastq_paths(self, read: str) -> Union[str, str, str]: for fastq_path in os.listdir(self.fastq_dir_path) if all([substring in fastq_path for substring in matches]) )[0] - self.logger.info( + self.logger.debug( self.logger.log_msgs["fastq_identified"], fastq_name, ", ".join(matches), diff --git a/upload_runfolder/__main__.py b/upload_runfolder/__main__.py index 0c3d0040..cdf8d92b 100755 --- a/upload_runfolder/__main__.py +++ b/upload_runfolder/__main__.py @@ -5,7 +5,6 @@ See README and docstrings for further details """ -import os import argparse from config.ad_config import URConfig from upload_runfolder.upload_runfolder import UploadRunfolder @@ -16,7 +15,7 @@ script_start_logmsg, script_end_logmsg, ) -from ad_logger.ad_logger import set_root_logger, shutdown_logs +from ad_logger.ad_logger import set_root_logger set_root_logger() @@ -31,7 +30,8 @@ def get_arguments(): parser = argparse.ArgumentParser( description="Uploads runfolder to DNAnexus", usage=( - "Upload user-specified runfolder to DNAnexus, providing a project ID to upload ""to, and any file patterns that should be ignored" + "Upload user-specified runfolder to DNAnexus, providing a project ID to upload " + "to, and any file patterns that should be ignored" ), ) parser.add_argument( # Define arguments diff --git a/test/test_upload_runfolder.py b/upload_runfolder/test_upload_runfolder.py similarity index 100% rename from test/test_upload_runfolder.py rename to upload_runfolder/test_upload_runfolder.py diff --git a/upload_runfolder/upload_runfolder.py b/upload_runfolder/upload_runfolder.py index 7a6f2c19..9288b5af 100755 --- a/upload_runfolder/upload_runfolder.py +++ b/upload_runfolder/upload_runfolder.py @@ -4,6 +4,7 @@ - UploadRunfolder Uploads a runfolder to DNAnexus """ + import sys import os import re @@ -30,7 +31,7 @@ class UploadRunfolder(URConfig): runfolderpath (str): Path of runfolder on workstation dnanexus_auth (str): DNAnexus auth token upload_flagfile (str): Path to flag file that denotes runfolder upload has started - nexus_identifiers (dict): Dictionary of proj_name and proj_id, or False + nexus_identifiers (dict): Dictionary of proj_name and proj_id, or False Methods: find_nexus_project() @@ -438,9 +439,7 @@ def count_uploaded_files(self, ignore: str) -> None: else: grep_ignore = "" - local_file_count = ( - f"find {self.runfolderpath} -type f {grep_ignore} | wc -l" - ) + local_file_count = f"find {self.runfolderpath} -type f {grep_ignore} | wc -l" files_expected, _, _ = execute_subprocess_command( local_file_count, self.logger, "exit_on_fail" ) diff --git a/wscleaner/README.md b/wscleaner/README.md new file mode 100644 index 00000000..bda5d82c --- /dev/null +++ b/wscleaner/README.md @@ -0,0 +1,67 @@ +## Workstation Cleaner (wscleaner) + +The Synnovis Genome Informatics team use a linux workstation to manage sequencing files. These files are uploaded to the DNAnexus service for storage, however clearing the workstation is time intensive. Workstation Cleaner (wscleaner) automates the deletion of local directories that have been uploaded to the DNAnexus cloud storage service. + +A RunFolderManager class will instantiate objects for local runfolders, each of which has an associated DNAnexus project object. The manager loops over the runfolders and deletes them if all checks pass. DNAnexus projects are accessed with the dxpy module, a python wrapper for the DNAnexus API. + +## Protocol + +When executed, runfolders in the input (root) directory are identified based on: +* Matching the expected runfolder regex pattern + +Runfolders are identified for deletion if meeting the following criteria: +* A single DNAnexus project is found matching the runfolder name +* All local FASTQ files are uploaded and in a 'closed' state (for TSO runfolders, there are no local fastqs so this check automatically passes) +* X logfiles are present in the DNAnexus project `automated_scripts_logfiles` directory (NB X can be added as a command line argument - default is 6) +* Runfolder's upload runfolder log file contains no errors + +TSO runfolders must meet the following additional criteria to be identified for deletion: +* Presence of bcl2fastq2_output.log file +* Presence of `TSO run.` in the bcl2fastq log file +* Presence of `_TSO` in the human readable DNANexus project name + +## Usage + +The script takes the following arguments, and can be run in either dry run mode (doesn't delete runfolders) or live mode (deletes runfolders). The script has been developed using python 3.10.6. + +_**When running on the workstation, the conda environment must be activated prior to running the wscleaner command.**_ + +``` +usage: Used to clean up the runfolders directory on the workstation + +Used to clean up runfolders that have been successfully uploaded to DNAnexus from the workstation. Will identify runfolders that meet the criteria for +deletion and delete them if run without the --dry-run flag + +options: + -h, --help show this help message and exit + -d, --dry-run Perform a dry run without deleting files + -m MIN_AGE, --min-age MIN_AGE + The age (days) a runfolder must be to be deleted + -l LOGFILE_COUNT, --logfile-count LOGFILE_COUNT + The number of logfiles a runfolder must have in /automated_scripts_logfiles +``` + + +### Dry run mode + +For example, if running in dry run mode: + +``` +conda activate python3.10.6 && python3 -m wscleaner --dry-run +``` + +### Live mode + +If running in production mode: + +``` +conda activate python3.10.6 && python3 -m wscleaner +``` + +## Testing + +Tests should be run and all passing prior to any new release. + +```bash +python3 -m pytest +``` diff --git a/test/__init__.py b/wscleaner/__init__.py old mode 100755 new mode 100644 similarity index 100% rename from test/__init__.py rename to wscleaner/__init__.py diff --git a/wscleaner/__main__.py b/wscleaner/__main__.py new file mode 100644 index 00000000..29a99425 --- /dev/null +++ b/wscleaner/__main__.py @@ -0,0 +1,70 @@ +#!/usr/bin/env python3 +"""wscleaner + +Delete runfolders in a root directory on the condition that it has uploaded to DNAnexus. + +Methods: + cli_parser(): Parses command line arguments + main(): Process input directory or API keys +""" +import argparse +from toolbox.toolbox import git_tag +from config.ad_config import BRANCH, RunfolderCleanupConfig +from .wscleaner import RunFolderManager + + +def get_arguments(): + """ + Uses argparse module to define and handle command line input arguments + and help menu + :return argparse.Namespace (object): Contains the parsed arguments + """ + parser = argparse.ArgumentParser( + description=( + "Used to clean up runfolders that have been successfully uploaded " + "to DNAnexus from the workstation. Will identify runfolders that " + "meet the criteria for deletion and delete them if run without " + "the --dry-run flag" + ), + usage="Used to clean up the runfolders directory on the workstation", + ) + parser.add_argument( + "-d", + "--dry-run", + help="Perform a dry run without deleting files", + action="store_true", + default=False, + ) + parser.add_argument( + "-m", + "--min-age", + help="The age (days) a runfolder must be to be deleted", + type=int, + default=14, + ) + parser.add_argument( + "-l", + "--logfile-count", + help="The number of logfiles a runfolder must have in /automated_scripts_logfiles", + type=int, + default=6, + ) + return parser.parse_args() + + +version = git_tag() +# Parse CLI arguments. Some arguments will exit the program intentionally. See docstring for detail. +parsed_args = get_arguments() + + +# If dry-run CLI flag is given, or script is run from the development area +# no directories are deleted by the runfolder manager +if parsed_args.dry_run or BRANCH != "main": + dry_run = True # Protects against deleting the test folders (!!) + +RFM = RunFolderManager( + dry_run=dry_run, + min_age=parsed_args.min_age, + logfile_count=parsed_args.logfile_count, +) +RFM.cleanup_runfolders() diff --git a/wscleaner/conftest.py b/wscleaner/conftest.py new file mode 100644 index 00000000..3475a5db --- /dev/null +++ b/wscleaner/conftest.py @@ -0,0 +1,138 @@ +"""conftest.py + +Config for pytest. +""" + +import os +import pytest +import shutil +import dxpy +from pathlib import Path +from config.ad_config import CREDENTIALS +from ..conftest import test_data_temp + +PROJECT_DIR = str(Path(__file__).absolute().parent.parent) # Project working directory + + +def pytest_addoption(parser): + """Add command line options to pytest""" + parser.addoption( + "--auth_token_file", + action="store", + default=None, + required=True, + help="File containing DNANexus authentication key", + ) + + +@pytest.fixture(scope="function") +def data_test_runfolders(): + """A fixture that returns a list of tuples containing (runfolder_name, fastq_list_file).""" + return [ + ( + "999999_NB551068_1234_WSCLEANT01", + os.path.join( + test_data_temp, "999999_NB551068_1234_WSCLEANT01_upload_runfolder.log" + ), + os.path.join(test_data_temp, "test_dir_1_fastqs.txt"), + [ + f"{test_data_temp}/999999_NB551068_1234_WSCLEANT01/Data/Intensities/BaseCalls/" + + line.strip() + for line in open(os.path.join(test_data_temp, "test_dir_1_fastqs.txt")) + ], + ), + ( + "999999_NB551068_1234_WSCLEANT02", + os.path.join( + test_data_temp, "999999_NB551068_1234_WSCLEANT02_upload_runfolder.log" + ), + os.path.join(test_data_temp, "test_dir_2_fastqs.txt"), + [ + f"{test_data_temp}/999999_NB551068_1234_WSCLEANT02/Data/Intensities/BaseCalls/" + + line.strip() + for line in open(os.path.join(test_data_temp, "test_dir_2_fastqs.txt")) + ], + ), + ] + + +@pytest.fixture(scope="function") +def data_test_runfolders_fail(): + """A fixture that returns a list of tuples containing (runfolder_name, fastq_list_file).""" + return [ + ( # Failure case as fastqs in fastq list are different from those in runfolder + "999999_NB551068_1234_WSCLEANT01", + os.path.join( + test_data_temp, "999999_NB551068_1234_WSCLEANT01_upload_runfolder.log" + ), + os.path.join(test_data_temp, "test_dir_1_fastqs.txt"), + [ + f"{test_data_temp}/999999_NB551068_1234_WSCLEANT02/Data/Intensities/BaseCalls/" + + line.strip() + for line in open(os.path.join(test_data_temp, "test_dir_2_fastqs.txt")) + ], + ), + ( # Failure case as runfolder name doesn't match an existing DNAnexus project + "999999_NB551068_2468_WSCLEANT02", + os.path.join( + test_data_temp, "999999_NB551068_2468_WSCLEANT02_upload_runfolder.log" + ), + os.path.join(test_data_temp, "test_dir_2_fastqs.txt"), + [ + f"{test_data_temp}/999999_NB551068_1234_WSCLEANT02/Data/Intensities/BaseCalls/" + + line.strip() + for line in open(os.path.join(test_data_temp, "test_dir_2_fastqs.txt")) + ], + ), + ] + + +def data_test_runfolder_uploaderror(): + """ """ + + +@pytest.fixture(scope="function", autouse=True) +def create_test_dirs(data_test_runfolders, request, monkeypatch): + """Create test data for testing. + + This is an autouse fixture with session function, meaning it is run once per test + """ + for test_case in data_test_runfolders: + runfolder_name, upload_runfolder_logfile, fastq_list_file, fastqs_list = ( + test_case + ) + # Create the runfolder directory as per Illumina spec + runfolder_path = os.path.join(test_data_temp, runfolder_name) + fastqs_path = os.path.join( + test_data_temp, f"{runfolder_path}/Data/Intensities/BaseCalls" + ) + Path(fastqs_path).mkdir(parents=True, exist_ok=True) + # Create dummy logfile + # open(upload_runfolder_logfile, 'w').close() + # Generate empty fastqfiles in runfolder + with open(fastq_list_file) as f: + fastq_list = f.read().splitlines() + for fastq_file in fastq_list: + Path(fastqs_path, fastq_file).touch(mode=777, exist_ok=True) + open( + os.path.join(runfolder_path, "RTAComplete.txt"), "w" + ).close() # Create RTAComplete file + open( + upload_runfolder_logfile, "w" + ).close() # Create dummy upload runfolder log file + with open( + CREDENTIALS["dnanexus_authtoken"] + ) as f: # Setup dxpy authentication token read from command line file + auth_token = f.read().rstrip() + dxpy.set_security_context( + {"auth_token_type": "Bearer", "auth_token": auth_token} + ) + + yield # Where the testing happens + # TEARDOWN - cleanup after each test + for test_case in data_test_runfolders: + runfolder_name, upload_runfolder_logfile, fastq_list_file, fastqs_list = ( + test_case + ) + runfolder_path = os.path.join(test_data_temp, runfolder_name) + shutil.rmtree(runfolder_path) diff --git a/wscleaner/test_wscleaner.py b/wscleaner/test_wscleaner.py new file mode 100644 index 00000000..848f6b3f --- /dev/null +++ b/wscleaner/test_wscleaner.py @@ -0,0 +1,100 @@ +import pytest +from . import wscleaner +from ..conftest import test_data_temp +from config.ad_config import RunfolderCleanupConfig + + +@pytest.fixture +def rfm(monkeypatch): + """ + Return an instance of the runfolder manager with the test/data directory + Monkeypatch is used to overwrite the upload runfolder logfile to the file created + in the conftest + """ + monkeypatch.setattr( + RunfolderCleanupConfig, + "RUNFOLDERS", + test_data_temp, + ) + return wscleaner.RunFolderManager(str(test_data_temp)) + + +@pytest.fixture +def rfm_dry(monkeypatch): + """Return an instance of the runfolder manager with the test/data directory + Monkeypatch is used to overwrite the upload runfolder logfile to the file created + in the conftest""" + monkeypatch.setattr( + RunfolderCleanupConfig, + "RUNFOLDERS", + test_data_temp, + ) + return wscleaner.RunFolderManager(dry_run=True) + + +class TestCheckRunfolder: + + def test_runfolders_ready(self, data_test_runfolders): + """ + Test that test runfolders pass checks for deletion + """ + for test_case in data_test_runfolders: + runfolder_name, upload_runfolder_logfile, fastq_list_file, fastqs_list = ( + test_case + ) + crf = wscleaner.CheckRunfolder( + runfolder_name=runfolder_name, + upload_runfolder_logfile=upload_runfolder_logfile, + fastqs_list=fastqs_list, + logfile_count=6, + ) + assert all( + [ + crf.dx_project, + crf.check_fastqs(), + crf.check_logfiles(), + crf.upload_log_exists(), + crf.check_upload_log(), + ] + ) + + def test_runfolders_ready_fail(self, data_test_runfolders_fail): + """ + Test that test runfolders pass checks for deletion + """ + for test_case in data_test_runfolders_fail: + runfolder_name, upload_runfolder_logfile, fastq_list_file, fastqs_list = ( + test_case + ) + crf = wscleaner.CheckRunfolder( + runfolder_name=runfolder_name, + upload_runfolder_logfile=upload_runfolder_logfile, + fastqs_list=fastqs_list, + logfile_count=6, + ) + assert not all( + [ + crf.dx_project, + crf.check_fastqs(), + crf.check_logfiles(), + crf.upload_log_exists(), + crf.check_upload_log(), + ] + ) + + def test_to_delete(self, data_test_runfolders): + """ + Test the function correctly identifies that the runfolders require deletion + """ + for test_case in data_test_runfolders: + runfolder_name, upload_runfolder_logfile, fastq_list_file, fastqs_list = ( + test_case + ) + crf = wscleaner.CheckRunfolder( + runfolder_name=runfolder_name, + upload_runfolder_logfile=upload_runfolder_logfile, + fastqs_list=fastqs_list, + logfile_count=6, + ) + result = crf.to_delete("NOT_TSO") + assert result == True diff --git a/wscleaner/wscleaner.py b/wscleaner/wscleaner.py new file mode 100644 index 00000000..1bc0f9d1 --- /dev/null +++ b/wscleaner/wscleaner.py @@ -0,0 +1,500 @@ +"""wscleaner.py + +Workstation Cleaner (wscleaner) automates the deletion of local directories that have been uploaded +to the DNAnexus cloud storage service. + +Contains the following classes: + +- RunFolderManager + Contains methods for finding, checking and deleting runfolders in a root directory +- DxProjectRunFolder + A DNAnexus project +- CheckRunfolder + Class for determining whether a runfolder should be deleted, and deleting it +""" + +import re +import logging +import shutil +import datetime +from typing import Optional +import os +import dxpy +from typing import List +from config.ad_config import RunfolderCleanupConfig +from ad_logger.ad_logger import AdLogger +from toolbox.toolbox import ( + return_scriptlog_config, + get_credential, + get_runfolder_path, + RunfolderObject, + RunfolderSamples, + script_start_logmsg, + script_end_logmsg +) +from ad_logger.ad_logger import set_root_logger + +# TODO this script can be further simplified in future as it shares functionality with other +# modules in this repo - functions can be reused + +root_logger = set_root_logger() + +# DEBUG message are ommitted from the console output by setting the stream handler level +# to INFO, making console outputs easier to read. DEBUG messages are still written to +# the application logfile and system log. +for handler in logging.root.handlers: + if isinstance(handler, logging.StreamHandler): + handler.setLevel(logging.INFO) + +ad_logger_obj = AdLogger( + __name__, + "wscleaner", + return_scriptlog_config()["wscleaner"], +) +script_logger = ad_logger_obj.get_logger() + + +# Set DNAnexus authentication token +dxpy.set_security_context( + { + "auth_token_type": "Bearer", + "auth_token": get_credential( + RunfolderCleanupConfig.CREDENTIALS["dnanexus_authtoken"] + ), + } +) + + +class RunFolderManager: + """ + Contains methods for finding, checking and deleting runfolders in a root directory + + Attributes + root (pathlib.Path): A path object to the root directory + deleted (List): A list of deleted runfolders populated by calls to self.delete() + + Methods + cleanup_runfolders() + Calls methods for cleaning up runfolders + get_runfolders_to_process() + Identify runfolders to consider for deletion + get_dirs_created_after(path, date_str) + Get directories created after a specified date. + delete() + Delete the local runfolder from the root directory and append name to self.deleted + """ + + def __init__(self, dry_run=False, min_age=10, logfile_count=6): + """ + Constructor for the RunFolderManager class + :param runfolders_dir (str): Runfolders directory, with default defined in the config + :param dry_run (bool): True if script should not delete runfolders, False if it should + :param min_age Optional[int]: Minimum age in days of runfolders that should be assessed by + the script + :param logfile_count (int): Expected number of logfiles uploaded to the DNAnexus project. + Default is 6 + """ + self.runfolders_dir = RunfolderCleanupConfig.RUNFOLDERS + self.dry_run = dry_run + self.min_age = min_age + self.logfile_count = logfile_count + self.samplesheets_dir = os.path.join(self.runfolders_dir, "samplesheets") + + def cleanup_runfolders(self) -> None: + """ + Calls methods for cleaning up runfolders + :return None: + """ + script_start_logmsg(script_logger, __file__) + + deleted_runfolders = [] # Deleted runfolders appended here by self.deleted + + runfolder_objects = self.get_runfolders_to_process() + script_logger.info( + f"Found local runfolders to consider deleting: {[rf_obj.runfolder_name for rf_obj, rf_samples_obj in runfolder_objects]}" + ) + for rf_obj, rf_samples_obj in runfolder_objects: + cr_obj = CheckRunfolder( + rf_obj.runfolder_name, + rf_obj.upload_runfolder_logfile, + rf_samples_obj.fastqs_list, + self.logfile_count, + ) + if cr_obj.to_delete(rf_samples_obj.pipeline): + self.delete(rf_obj.runfolder_name, rf_obj.runfolderpath) + deleted_runfolders.append(rf_obj.runfolder_name) + # Record runfolders removed by this iteration + script_logger.info(f"Runfolders deleted in this instance: {deleted_runfolders}") + script_end_logmsg(script_logger, __file__) + + return deleted_runfolders + + def get_runfolders_to_process(self) -> list: + """ + Identify runfolders to consider for deletion + :return runfolder_objects (list): List of tuples (RunfolderObject, + RunfolderSamples) + """ + runfolder_objects = [] + folders = self.get_dirs_created_after(self.runfolders_dir, '2024-06-12') # V45.0.0 of the automated scripts (logfile number changed to 6) + for runfolder_path in folders: + folder_name = runfolder_path.split("/")[-1] + if get_runfolder_path(folder_name) and re.compile( + RunfolderCleanupConfig.RUNFOLDER_PATTERN + ).match(folder_name): + script_logger.debug( + f"Initiating RunfolderObject instance for {folder_name}" + ) + rf_obj = RunfolderObject(folder_name, RunfolderCleanupConfig.TIMESTAMP) + rf_age = rf_obj.age() + if os.path.exists(os.path.join(self.samplesheets_dir, f"{folder_name}_SampleSheet.csv")): + rf_samples_obj = RunfolderSamples(rf_obj, script_logger) + if rf_samples_obj: + if os.path.exists(rf_obj.rtacompletefile_path): + if (rf_age >= self.min_age): + # Catch TSO500 runfolders here (do not contain fastqs) + if rf_samples_obj.pipeline == "dev": + script_logger.info( + f"{rf_obj.runfolder_name} is a DEV runfolder therefore should not be deleted" + ) + else: + if rf_samples_obj.pipeline == "tso500": + script_logger.info( + f"{rf_obj.runfolder_name} is a TSO500 runfolder and is >= {self.min_age} days old" + ) + runfolder_objects.append(tuple([rf_obj, rf_samples_obj])) # Append to list to process + else: + # Criteria for runfolder: Older than or equal to min_age and contains fastq.gz files + if rf_samples_obj.fastqs_list: + if len( + rf_samples_obj.fastqs_list + ) > 0: + script_logger.info( + f"{rf_obj.runfolder_name} contains 1 or more fastq and is >= {self.min_age} days old" + ) + runfolder_objects.append(tuple([rf_obj, rf_samples_obj])) # Append to list to process + else: + script_logger.info( + f"{rf_obj.runfolder_name} has 0 fastqs and is not a TSO runfolder" + ) + else: + script_logger.info( + f"{rf_obj.runfolder_name}: Expected fastqs could not be parsed from the SampleSheet for the run" + ) + else: + script_logger.info( + f"{rf_obj.runfolder_name} is < {self.min_age} days old" + ) + else: + script_logger.info( + f"{rf_obj.runfolder_name} is not a runfolder, or sequencing has not yet finished" + ) + else: + script_logger.info( + f"Corresponding SampleSheet for {rf_obj.runfolder_name} could not be located. This is required for analysing for deletion" + ) + to_assess = [rf_obj.runfolder_name for runfolder_object in runfolder_objects] + all_folders = [folder.split("/")[-1].strip() for folder in folders] + to_skip = [folder for folder in all_folders if folder not in to_assess] + script_logger.info( + "Skipping over folders: " + ", ".join(to_skip) + ) + return runfolder_objects + + def get_dirs_created_after(self, path: str, date_str: str) -> List[str]: + """ + Get directories created after a specified date. + :param path (str): The directory path to check. + :param date_str (str): The date string to compare against in 'YYYY-MM-DD' format + :return List[str]: List of directory paths that were created after the specified date + """ + specified_date = datetime.datetime.strptime(date_str, '%Y-%m-%d') + dirs_created_after = [] + + for dirname in os.listdir(path): + dir_full_path = os.path.join(path, dirname) + if os.path.isdir(dir_full_path): + if os.name == 'nt': + creation_time = os.path.getctime(dir_full_path) + else: + stat_info = os.stat(dir_full_path) + creation_time = getattr(stat_info, 'st_birthtime', stat_info.st_mtime) + + if datetime.datetime.fromtimestamp(creation_time) > specified_date: + dirs_created_after.append(dir_full_path) + + return dirs_created_after + + def delete(self, runfolder_name: str, runfolder_path: str) -> Optional[bool]: + """ + Delete the local runfolder from the root directory + :param runfolder_name (str): Runfolder name + :param runfolder_path (str): Path of runfolder + :return Optional[bool]: Return True if runfolder deleted, + else None + """ + if self.dry_run: + script_logger.info(f"DRY RUN DELETE {runfolder_name}") + else: + shutil.rmtree(runfolder_path) + script_logger.info(f"{runfolder_name} DELETED.") + return True + + +class CheckRunfolder: + """ + Class for determining whether a runfolder should be deleted, and deleting it + + Attributes + runfolder_name (str): Runfolder name + upload_runfolder_logfile (str): Path to upload runfolder logfile + fastqs_list (list): List of fastq files in the local runfolder + dx_project (DxProjectRunfolder): Instance of DxProjectRunfolder + + Methods + check_fastqs() + Returns true if a runfolder's fastq.gz files match those in it's DNAnexus project + check_logfiles() + Returns true if a runfolder's DNAnexus project contains 6 logfiles in the + expected location + upload_log_exists() + Returns true if a runfolder's upload log exists + check_upload_log() + Returns true if a runfolder's upload log contains no upload errors + to_delete(pipeline) + Determine whether a runfolder is safe for deletion + """ + + def __init__( + self, + runfolder_name: str, + upload_runfolder_logfile: str, + fastqs_list: list, + logfile_count: int, + ): + """ + Constructor for the CheckRunfolder object + :param runfolder_name (str): Runfolder name + :param upload_runfolder_logfile (str): Path to upload runfolder logfile + :param fastqs_list (list): List of fastq files in the local runfolder + :param logfile_count (int): Number of logfiles expected in the DNAnexus project + """ + self.runfolder_name = runfolder_name + script_logger.info(f"Processing {self.runfolder_name}") + self.upload_runfolder_logfile = upload_runfolder_logfile + self.fastqs_list = fastqs_list + self.logfile_count = logfile_count + self.dx_project = DxProjectRunFolder(self.runfolder_name) + + def check_fastqs(self) -> bool: + """ + Returns true if a runfolder's fastq.gz files match those in it's DNAnexus project. + Ensures all fastqs were uploaded. + :return fastq_bool (bool): True if all fastqs present in DNAnexus project, + False if any fastqs are missing + """ + if self.dx_project: + dx_fastqs = self.dx_project.find_fastqs() + fastq_bool = True + for fastq in self.fastqs_list: # Local fastqs + if fastq.split("/")[-1] not in dx_fastqs: + script_logger.debug(f"Fastq missing from DNAnexus project: {fastq}") + fastq_bool = False + script_logger.debug(f"{self.runfolder_name} FASTQ BOOL: {fastq_bool}") + + if not fastq_bool: # Fastqs not all present in DNAnexus + script_logger.warning(f"{self.runfolder_name} - FASTQ MISMATCH") + return fastq_bool + + def check_logfiles(self) -> bool: + """ + Returns true if a runfolder's DNAnexus project contains logfile_count + logfiles in the expected location + :return + """ + if self.dx_project: + dx_logfiles = self.dx_project.count_logfiles() + logfile_bool = dx_logfiles == self.logfile_count + script_logger.debug(f"{self.runfolder_name} LOGFILE BOOL: {logfile_bool}") + if not logfile_bool: + script_logger.warning(f"{self.runfolder_name} - LOGFILE MISMATCH") + return logfile_bool + + def upload_log_exists(self) -> Optional[bool]: + """ + Returns true if a runfolder's upload log file exists + :return Optional[bool]: Return True if runfolder upload log file + exists, else None + """ + if os.path.exists(self.upload_runfolder_logfile): + return True + else: + script_logger.warning(f"{self.runfolder_name} - UPLOAD LOG MISSING") + script_logger.debug(f"{self.runfolder_name} upload log file does not exist") + + def check_upload_log(self): + """ + Returns true if a runfolder's upload log file contains no ERROR logs + :return Optional[bool]: Return True if upload log file exists and contains + no errors, else None + """ + upload_log_bool = False + if os.path.exists(self.upload_runfolder_logfile): + with open(self.upload_runfolder_logfile, "r") as f: + log_contents = f.readlines() + print + if any("- ERROR -" in string for string in log_contents): + script_logger.debug(f"{self.runfolder_name} upload log contains errors") + script_logger.warning( + f"{self.runfolder_name} - UPLOAD LOG CONTAINS ERRORS" + ) + upload_log_bool = False + else: + upload_log_bool = True + script_logger.debug(f"{self.runfolder_name} UPLOAD LOG BOOL: {upload_log_bool}") + return upload_log_bool + + def to_delete(self, pipeline: str) -> Optional[bool]: + """ + Determine whether a runfolder is safe for deletion + :param pipeline (str): Name of pipeline + :return Optional[bool]: Return True if runfolder deleted / marked for deletion, else None + """ + # Delete runfolder if it meets the backup criteria + # dx_project is evaluated first as following criteria checks depend on it + tso_run = False + + if pipeline == RunfolderCleanupConfig.CAPTURE_PANEL_DICT["tso500"]["pipeline"]: + tso_run = True + + if self.dx_project: + upload_log_exists = self.upload_log_exists() + clean_upload_log = self.check_upload_log() + logfiles_uploaded = self.check_logfiles() + + if tso_run: + if all( + [ + logfiles_uploaded, + upload_log_exists, + clean_upload_log, + ] + ): + return True + else: + fastqs_uploaded = self.check_fastqs() + if all( + [ + fastqs_uploaded, + logfiles_uploaded, + upload_log_exists, + clean_upload_log, + ] + ): + return True + + +class DxProjectRunFolder: + """ + A DNAnexus runfolder object + + Attributes + runfolder (str): Runfolder name + id (str): Project ID of the matching runfolder project in DNANexus + logfile_dir (str): Directory in DNAnexus containing the logfiles + + Methods + dx_find_one_project() + Find a single DNAnexus project from the input runfolder name + find_fastqs() + Returns a list of files in the identified DNAnexus project with the fastq.gz extension + count_logfiles() + Count logfiles in the DNAnexus project, in the /$RUNFOLDER_NAME/automated_scripts_logfiles + subdirectory + """ + + def __init__(self, runfolder_name: str): + """ + Constructor for the DxProjectRunFolder class + :param runfolder_name (str): Name of runfolder + """ + self.runfolder_name = runfolder_name + self.dnanexus_id = self.dx_find_one_project() + self.logfile_dir = str( + os.path.join("/", self.runfolder_name, "automated_scripts_logfiles") + ) + + def dx_find_one_project(self) -> Optional[str]: + """ + Find a single DNAnexus project from the input runfolder name + :return Optional[str]: Return DNAnexus project ID string, if identfied, else return None + """ + try: + # Search for the project matching self.runfolder. + # name_mode='regexp' - look for any occurence of the runfolder name in the project name. + # Setting more_ok/zero_ok to False ensures only one project is succesfully returned. + project = dxpy.find_one_project( + name=self.runfolder_name, + name_mode="regexp", + more_ok=False, + zero_ok=False, + ) + script_logger.debug( + f'{self.runfolder_name} DNAnexus project: {project["id"]}' + ) + return project["id"] + except dxpy.exceptions.DXSearchError as error: + # Catch exception and raise none + script_logger.warning( + f"DX PROJECT MISMATCH - 0 or >1 DNAnexus projects found for {self.runfolder_name}: {error}" + ) + return None + + def find_fastqs(self): + """ + Return a list of files in the DNAnexus project with the fastq.gz extension + :return fastq_filenames (list): List of files in the DNAnexus project + with the fastq.gz extension + """ + # Search dnanexus for files with the fastq.gz extension. + # name_mode='regexp' tells dxpy to look for any occurence of 'fastq.gz' in the filename + search_response = dxpy.find_data_objects( + project=self.dnanexus_id, + classname="file", + name="fastq.gz", + name_mode="regexp", + ) + file_ids = [result["id"] for result in search_response] + # Gather a list of uploaded fastq files with the state 'closed', indicating a completed upload. + fastq_filenames_unsorted = [] + for dx_file in file_ids: + file_description = dxpy.describe(dx_file) + if file_description["state"] == "closed": + fastq_filenames_unsorted.append(file_description["name"]) + # Sort fastq filenames for cleaner logfile output + fastq_filenames = sorted(fastq_filenames_unsorted) + script_logger.debug( + f'{self.dnanexus_id} contains {len(fastq_filenames)} "closed" fastq files: {fastq_filenames}' + ) + return fastq_filenames + + def count_logfiles(self) -> int: + """ + Count logfiles in the DNAnexus project, in the /$RUNFOLDER_NAME/automated_scripts_logfiles + subdirectory + :return (int): Count of automated scripts logfiles identified in the DNAnexus project + """ + logfile_list = dxpy.find_data_objects( + project=self.dnanexus_id, folder=self.logfile_dir, classname="file" + ) + return len(list(logfile_list)) + + def __bool__(self) -> bool: + """ + Allows boolean expressions on class instances + :return (bool): Return True if a single DNAnexus project was found + """ + if self.dnanexus_id: + return True + else: + return False