diff --git a/xnat_ingest/cli/stage.py b/xnat_ingest/cli/stage.py index 7bf150c..2540639 100644 --- a/xnat_ingest/cli/stage.py +++ b/xnat_ingest/cli/stage.py @@ -27,27 +27,29 @@ are uploaded to XNAT """, ) -@click.argument("dicoms_path", type=str) -@click.argument("staging_dir", type=click.Path(path_type=Path)) +@click.argument("dicoms_path", type=str, envvar="XNAT_INGEST_STAGE_DICOMS_PATH") +@click.argument( + "staging_dir", type=click.Path(path_type=Path), envvar="XNAT_INGEST_STAGE_DIR" +) @click.option( "--project-field", type=DicomField, default="StudyID", - envvar="XNAT_INGEST_PROJECT", + envvar="XNAT_INGEST_STAGE_PROJECT", help=("The keyword or tag of the DICOM field to extract the XNAT project ID from "), ) @click.option( "--subject-field", type=DicomField, default="PatientID", - envvar="XNAT_INGEST_SUBJECT", + envvar="XNAT_INGEST_STAGE_SUBJECT", help=("The keyword or tag of the DICOM field to extract the XNAT subject ID from "), ) @click.option( "--visit-field", type=DicomField, default="AccessionNumber", - envvar="XNAT_INGEST_SESSION", + envvar="XNAT_INGEST_STAGE_SESSION", help=( "The keyword or tag of the DICOM field to extract the XNAT imaging session ID from " ), @@ -63,7 +65,7 @@ type=AssociatedFiles.cli_type, nargs=2, default=None, - envvar="XNAT_INGEST_ASSOCIATED", + envvar="XNAT_INGEST_STAGE_ASSOCIATED", metavar=" ", help=( 'The "glob" arg is a glob pattern by which to detect associated files to be ' @@ -85,14 +87,14 @@ @click.option( "--delete/--dont-delete", default=False, - envvar="XNAT_INGEST_DELETE", + envvar="XNAT_INGEST_STAGE_DELETE", help="Whether to delete the session directories after they have been uploaded or not", ) @click.option( "--log-level", default="info", type=str, - envvar="XNAT_INGEST_LOGLEVEL", + envvar="XNAT_INGEST_STAGE_LOGLEVEL", help=("The level of the logging printed to stdout"), ) @click.option( @@ -101,7 +103,7 @@ type=LogFile.cli_type, nargs=2, metavar=" ", - envvar="XNAT_INGEST_LOGFILE", + envvar="XNAT_INGEST_STAGE_LOGFILE", help=( 'Location to write the output logs to, defaults to "upload-logs" in the ' "export directory" @@ -114,7 +116,7 @@ nargs=3, metavar="
", multiple=True, - envvar="XNAT_INGEST_LOGEMAIL", + envvar="XNAT_INGEST_STAGE_LOGEMAIL", help=( "Email(s) to send logs to. When provided in an environment variable, " "mail and log level are delimited by ',' and separate destinations by ';'" @@ -126,7 +128,7 @@ nargs=4, metavar=" ", default=None, - envvar="XNAT_INGEST_MAILSERVER", + envvar="XNAT_INGEST_STAGE_MAILSERVER", help=( "the mail server to send logger emails to. When provided in an environment variable, " "args are delimited by ';'" @@ -142,6 +144,7 @@ "--deidentify/--dont-deidentify", default=False, type=bool, + envvar="XNAT_INGEST_STAGE_DEIDENTIFY", help="whether to deidentify the file names and DICOM metadata before staging", ) def stage( diff --git a/xnat_ingest/cli/transfer.py b/xnat_ingest/cli/transfer.py index 2222c21..61ea412 100644 --- a/xnat_ingest/cli/transfer.py +++ b/xnat_ingest/cli/transfer.py @@ -14,54 +14,6 @@ set_logger_handling, ) from .base import cli -import os -import datetime -import boto3 -import paramiko - - -def remove_old_files_on_s3(remote_store: str, threshold: int): - # Parse S3 bucket and prefix from remote store - bucket_name, prefix = remote_store[5:].split("/", 1) - - # Create S3 client - s3_client = boto3.client("s3") - - # List objects in the bucket with the specified prefix - response = s3_client.list_objects_v2(Bucket=bucket_name, Prefix=prefix) - - now = datetime.datetime.now() - - # Iterate over objects and delete files older than the threshold - for obj in response.get("Contents", []): - last_modified = obj["LastModified"] - age = (now - last_modified).days - if age > threshold: - s3_client.delete_object(Bucket=bucket_name, Key=obj["Key"]) - - -def remove_old_files_on_ssh(remote_store: str, threshold: int): - # Parse SSH server and directory from remote store - server, directory = remote_store.split("@", 1) - - # Create SSH client - ssh_client = paramiko.SSHClient() - ssh_client.load_system_host_keys() - ssh_client.connect(server) - - # Execute find command to list files in the directory - stdin, stdout, stderr = ssh_client.exec_command(f"find {directory} -type f") - - now = datetime.datetime.now() - - # Iterate over files and delete files older than the threshold - for file_path in stdout.read().decode().splitlines(): - last_modified = datetime.datetime.fromtimestamp(os.path.getmtime(file_path)) - age = (now - last_modified).days - if age > threshold: - ssh_client.exec_command(f"rm {file_path}") - - ssh_client.close() @cli.command( @@ -78,13 +30,13 @@ def remove_old_files_on_ssh(remote_store: str, threshold: int): an SSH server. """, ) -@click.argument("staging_dir", type=str) -@click.argument("remote_store", type=str, envvar="XNAT_INGEST_REMOTE_STORE") +@click.argument("staging_dir", type=str, envvar="XNAT_INGEST_STAGE_DIR") +@click.argument("remote_store", type=str, envvar="XNAT_INGEST_TRANSFER_REMOTE_STORE") @click.option( "--store-credentials", type=click.Path(path_type=Path), metavar=" ", - envvar="XNAT_INGEST_STORE_CREDENTIALS", + envvar="XNAT_INGEST_TRANSFER_STORE_CREDENTIALS", default=None, nargs=2, help="Credentials to use to access of data stored in remote stores (e.g. AWS S3)", @@ -93,7 +45,7 @@ def remove_old_files_on_ssh(remote_store: str, threshold: int): "--log-level", default="info", type=str, - envvar="XNAT_INGEST_LOGLEVEL", + envvar="XNAT_INGEST_TRANSFER_LOGLEVEL", help=("The level of the logging printed to stdout"), ) @click.option( @@ -102,7 +54,7 @@ def remove_old_files_on_ssh(remote_store: str, threshold: int): type=LogFile.cli_type, nargs=2, metavar=" ", - envvar="XNAT_INGEST_LOGFILE", + envvar="XNAT_INGEST_TRANSFER_LOGFILE", help=( 'Location to write the output logs to, defaults to "upload-logs" in the ' "export directory" @@ -115,7 +67,7 @@ def remove_old_files_on_ssh(remote_store: str, threshold: int): nargs=3, metavar="
", multiple=True, - envvar="XNAT_INGEST_LOGEMAIL", + envvar="XNAT_INGEST_TRANSFER_LOGEMAIL", help=( "Email(s) to send logs to. When provided in an environment variable, " "mail and log level are delimited by ',' and separate destinations by ';'" @@ -126,7 +78,7 @@ def remove_old_files_on_ssh(remote_store: str, threshold: int): type=MailServer.cli_type, metavar=" ", default=None, - envvar="XNAT_INGEST_MAILSERVER", + envvar="XNAT_INGEST_TRANSFER_MAILSERVER", help=( "the mail server to send logger emails to. When provided in an environment variable, " "args are delimited by ';'" @@ -135,7 +87,7 @@ def remove_old_files_on_ssh(remote_store: str, threshold: int): @click.option( "--delete/--dont-delete", default=False, - envvar="XNAT_INGEST_DELETE", + envvar="XNAT_INGEST_TRANSFER_DELETE", help="Whether to delete the session directories after they have been uploaded or not", ) @click.option( @@ -151,14 +103,7 @@ def remove_old_files_on_ssh(remote_store: str, threshold: int): default=None, metavar=" ", help="The XNAT server to upload to plus the user and password to use", - envvar="XNAT_INGEST_XNAT_LOGIN", -) -@click.option( - "--clean-up-older-than", - type=int, - metavar="", - default=0, - help="The number of days to keep files in the remote store for", + envvar="XNAT_INGEST_TRANSFER_XNAT_LOGIN", ) def transfer( staging_dir: Path, @@ -171,7 +116,6 @@ def transfer( delete: bool, raise_errors: bool, xnat_login: ty.Optional[ty.Tuple[str, str, str]], - clean_up_older_than: int, ): if not staging_dir.exists(): @@ -281,23 +225,6 @@ def transfer( logger.info("Deleting %s after successful upload", session_dir) shutil.rmtree(session_dir) - if clean_up_older_than: - logger.info( - "Cleaning up files in %s older than %d days", - remote_store, - clean_up_older_than, - ) - if store_type == "s3": - remove_old_files_on_s3( - remote_store=remote_store, threshold=clean_up_older_than - ) - elif store_type == "ssh": - remove_old_files_on_ssh( - remote_store=remote_store, threshold=clean_up_older_than - ) - else: - assert False - if __name__ == "__main__": transfer() diff --git a/xnat_ingest/cli/upload.py b/xnat_ingest/cli/upload.py index 5818721..0282467 100644 --- a/xnat_ingest/cli/upload.py +++ b/xnat_ingest/cli/upload.py @@ -1,5 +1,7 @@ from pathlib import Path import shutil +import os +import datetime import traceback import typing as ty from collections import defaultdict @@ -9,6 +11,7 @@ from tqdm import tqdm from natsort import natsorted import boto3 +import paramiko from fileformats.generic import File from arcana.core.data.set import Dataset from arcana.xnat import Xnat @@ -40,21 +43,21 @@ PASSWORD is the password for the XNAT user, alternatively "XNAT_INGEST_PASS" env. var """, ) -@click.argument("staged", type=str) -@click.argument("server", type=str, envvar="XNAT_INGEST_HOST") -@click.argument("user", type=str, envvar="XNAT_INGEST_USER") -@click.argument("password", type=str, envvar="XNAT_INGEST_PASS") +@click.argument("staged", type=str, envvar="XNAT_INGEST_UPLOAD_STAGED") +@click.argument("server", type=str, envvar="XNAT_INGEST_UPLOAD_HOST") +@click.argument("user", type=str, envvar="XNAT_INGEST_UPLOAD_USER") +@click.argument("password", type=str, envvar="XNAT_INGEST_UPLOAD_PASS") @click.option( "--delete/--dont-delete", default=True, - envvar="XNAT_INGEST_DELETE", + envvar="XNAT_INGEST_UPLOAD_DELETE", help="Whether to delete the session directories after they have been uploaded or not", ) @click.option( "--log-level", default="info", type=str, - envvar="XNAT_INGEST_LOGLEVEL", + envvar="XNAT_INGEST_UPLOAD_LOGLEVEL", help=("The level of the logging printed to stdout"), ) @click.option( @@ -63,7 +66,7 @@ type=LogFile.cli_type, nargs=2, metavar=" ", - envvar="XNAT_INGEST_LOGFILE", + envvar="XNAT_INGEST_UPLOAD_LOGFILE", help=( 'Location to write the output logs to, defaults to "upload-logs" in the ' "export directory" @@ -76,7 +79,7 @@ nargs=3, metavar="
", multiple=True, - envvar="XNAT_INGEST_LOGEMAIL", + envvar="XNAT_INGEST_UPLOAD_LOGEMAIL", help=( "Email(s) to send logs to. When provided in an environment variable, " "mail and log level are delimited by ',' and separate destinations by ';'" @@ -87,7 +90,7 @@ type=MailServer.cli_type, metavar=" ", default=None, - envvar="XNAT_INGEST_MAILSERVER", + envvar="XNAT_INGEST_UPLOAD_MAILSERVER", help=( "the mail server to send logger emails to. When provided in an environment variable, " "args are delimited by ';'" @@ -99,7 +102,7 @@ default=(), type=str, multiple=True, - envvar="XNAT_INGEST_ALWAYSINCLUDE", + envvar="XNAT_INGEST_UPLOAD_ALWAYSINCLUDE", help=( "Scan types to always include in the upload, regardless of whether they are" "specified in a column or not. Specified using the scan types IANA mime-type or " @@ -118,7 +121,7 @@ "--store-credentials", type=str, metavar=" ", - envvar="XNAT_INGEST_STORE_CREDENTIALS", + envvar="XNAT_INGEST_UPLOAD_STORE_CREDENTIALS", default=None, nargs=2, help="Credentials to use to access of data stored in remote stores (e.g. AWS S3)", @@ -127,19 +130,27 @@ "--temp-dir", type=Path, default=None, - envvar="XNAT_INGEST_WORKDIR", + envvar="XNAT_INGEST_UPLOAD_TEMPDIR", help="The directory to use for temporary downloads (i.e. from s3)", ) @click.option( "--use-manifest/--dont-use-manifest", default=None, - envvar="XNAT_INGEST_REQUIRE_MANIFEST", + envvar="XNAT_INGEST_UPLOAD_REQUIRE_MANIFEST", help=( "Whether to use the manifest file in the staged sessions to load the " "directory structure. By default it is used if present and ignore if not there" ), type=bool, ) +@click.option( + "--clean-up-older-than", + type=int, + metavar="", + envvar="XNAT_INGEST_UPLOAD_CLEANUP_OLDER_THAN", + default=0, + help="The number of days to keep files in the remote store for", +) def upload( staged: str, server: str, @@ -155,6 +166,7 @@ def upload( store_credentials: ty.Tuple[str, str], temp_dir: ty.Optional[Path], use_manifest: bool, + clean_up_older_than: int, ): set_logger_handling(log_level, log_file, log_emails, mail_server) @@ -446,3 +458,60 @@ def iter_staged_sessions(): continue else: raise + + if clean_up_older_than: + logger.info( + "Cleaning up files in %s older than %d days", + staged, + clean_up_older_than, + ) + if staged.startswith("s3://"): + remove_old_files_on_s3(remote_store=staged, threshold=clean_up_older_than) + elif "@" in staged: + remove_old_files_on_ssh(remote_store=staged, threshold=clean_up_older_than) + else: + assert False + + +def remove_old_files_on_s3(remote_store: str, threshold: int): + # Parse S3 bucket and prefix from remote store + bucket_name, prefix = remote_store[5:].split("/", 1) + + # Create S3 client + s3_client = boto3.client("s3") + + # List objects in the bucket with the specified prefix + response = s3_client.list_objects_v2(Bucket=bucket_name, Prefix=prefix) + + now = datetime.datetime.now() + + # Iterate over objects and delete files older than the threshold + for obj in response.get("Contents", []): + last_modified = obj["LastModified"] + age = (now - last_modified).days + if age > threshold: + s3_client.delete_object(Bucket=bucket_name, Key=obj["Key"]) + + +def remove_old_files_on_ssh(remote_store: str, threshold: int): + # Parse SSH server and directory from remote store + server, directory = remote_store.split("@", 1) + + # Create SSH client + ssh_client = paramiko.SSHClient() + ssh_client.load_system_host_keys() + ssh_client.connect(server) + + # Execute find command to list files in the directory + stdin, stdout, stderr = ssh_client.exec_command(f"find {directory} -type f") + + now = datetime.datetime.now() + + # Iterate over files and delete files older than the threshold + for file_path in stdout.read().decode().splitlines(): + last_modified = datetime.datetime.fromtimestamp(os.path.getmtime(file_path)) + age = (now - last_modified).days + if age > threshold: + ssh_client.exec_command(f"rm {file_path}") + + ssh_client.close() diff --git a/xnat_ingest/tests/test_cli.py b/xnat_ingest/tests/test_cli.py index 6dea77c..7644f67 100644 --- a/xnat_ingest/tests/test_cli.py +++ b/xnat_ingest/tests/test_cli.py @@ -128,7 +128,8 @@ def test_stage_and_upload( for assoc_fspath in assoc_fspaths: os.link( assoc_fspath, - associated_files_dir / f"{assoc_fspath.stem}-{i}{assoc_fspath.suffix}", + associated_files_dir + / f"{assoc_fspath.stem}-{i}{assoc_fspath.suffix}", ) # Create data store @@ -200,7 +201,7 @@ def test_stage_and_upload( # "info", "--raise-errors", "--delete", - ] + ], ) assert result.exit_code == 0, show_cli_trace(result) @@ -217,9 +218,9 @@ def test_stage_and_upload( "--raise-errors", ], env={ - "XNAT_INGEST_HOST": xnat_server, - "XNAT_INGEST_USER": "admin", - "XNAT_INGEST_PASS": "admin", + "XNAT_INGEST_UPLOAD_HOST": xnat_server, + "XNAT_INGEST_UPLOAD_USER": "admin", + "XNAT_INGEST_UPLOAD_PASS": "admin", }, )