Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Additional environment vars corresponding to CLI options that didn't have them #4

Merged
merged 5 commits into from
Mar 15, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 14 additions & 11 deletions xnat_ingest/cli/stage.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,27 +27,29 @@
are uploaded to XNAT
""",
)
@click.argument("dicoms_path", type=str)
@click.argument("staging_dir", type=click.Path(path_type=Path))
@click.argument("dicoms_path", type=str, envvar="XNAT_INGEST_STAGE_DICOMS_PATH")
@click.argument(
"staging_dir", type=click.Path(path_type=Path), envvar="XNAT_INGEST_STAGE_DIR"
)
@click.option(
"--project-field",
type=DicomField,
default="StudyID",
envvar="XNAT_INGEST_PROJECT",
envvar="XNAT_INGEST_STAGE_PROJECT",
help=("The keyword or tag of the DICOM field to extract the XNAT project ID from "),
)
@click.option(
"--subject-field",
type=DicomField,
default="PatientID",
envvar="XNAT_INGEST_SUBJECT",
envvar="XNAT_INGEST_STAGE_SUBJECT",
help=("The keyword or tag of the DICOM field to extract the XNAT subject ID from "),
)
@click.option(
"--visit-field",
type=DicomField,
default="AccessionNumber",
envvar="XNAT_INGEST_SESSION",
envvar="XNAT_INGEST_STAGE_SESSION",
help=(
"The keyword or tag of the DICOM field to extract the XNAT imaging session ID from "
),
Expand All @@ -63,7 +65,7 @@
type=AssociatedFiles.cli_type,
nargs=2,
default=None,
envvar="XNAT_INGEST_ASSOCIATED",
envvar="XNAT_INGEST_STAGE_ASSOCIATED",
metavar="<glob> <id-pattern>",
help=(
'The "glob" arg is a glob pattern by which to detect associated files to be '
Expand All @@ -85,14 +87,14 @@
@click.option(
"--delete/--dont-delete",
default=False,
envvar="XNAT_INGEST_DELETE",
envvar="XNAT_INGEST_STAGE_DELETE",
help="Whether to delete the session directories after they have been uploaded or not",
)
@click.option(
"--log-level",
default="info",
type=str,
envvar="XNAT_INGEST_LOGLEVEL",
envvar="XNAT_INGEST_STAGE_LOGLEVEL",
help=("The level of the logging printed to stdout"),
)
@click.option(
Expand All @@ -101,7 +103,7 @@
type=LogFile.cli_type,
nargs=2,
metavar="<path> <loglevel>",
envvar="XNAT_INGEST_LOGFILE",
envvar="XNAT_INGEST_STAGE_LOGFILE",
help=(
'Location to write the output logs to, defaults to "upload-logs" in the '
"export directory"
Expand All @@ -114,7 +116,7 @@
nargs=3,
metavar="<address> <loglevel> <subject-preamble>",
multiple=True,
envvar="XNAT_INGEST_LOGEMAIL",
envvar="XNAT_INGEST_STAGE_LOGEMAIL",
help=(
"Email(s) to send logs to. When provided in an environment variable, "
"mail and log level are delimited by ',' and separate destinations by ';'"
Expand All @@ -126,7 +128,7 @@
nargs=4,
metavar="<host> <sender-email> <user> <password>",
default=None,
envvar="XNAT_INGEST_MAILSERVER",
envvar="XNAT_INGEST_STAGE_MAILSERVER",
help=(
"the mail server to send logger emails to. When provided in an environment variable, "
"args are delimited by ';'"
Expand All @@ -142,6 +144,7 @@
"--deidentify/--dont-deidentify",
default=False,
type=bool,
envvar="XNAT_INGEST_STAGE_DEIDENTIFY",
help="whether to deidentify the file names and DICOM metadata before staging",
)
def stage(
Expand Down
91 changes: 9 additions & 82 deletions xnat_ingest/cli/transfer.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,54 +14,6 @@
set_logger_handling,
)
from .base import cli
import os
import datetime
import boto3
import paramiko


def remove_old_files_on_s3(remote_store: str, threshold: int):
# Parse S3 bucket and prefix from remote store
bucket_name, prefix = remote_store[5:].split("/", 1)

# Create S3 client
s3_client = boto3.client("s3")

# List objects in the bucket with the specified prefix
response = s3_client.list_objects_v2(Bucket=bucket_name, Prefix=prefix)

now = datetime.datetime.now()

# Iterate over objects and delete files older than the threshold
for obj in response.get("Contents", []):
last_modified = obj["LastModified"]
age = (now - last_modified).days
if age > threshold:
s3_client.delete_object(Bucket=bucket_name, Key=obj["Key"])


def remove_old_files_on_ssh(remote_store: str, threshold: int):
# Parse SSH server and directory from remote store
server, directory = remote_store.split("@", 1)

# Create SSH client
ssh_client = paramiko.SSHClient()
ssh_client.load_system_host_keys()
ssh_client.connect(server)

# Execute find command to list files in the directory
stdin, stdout, stderr = ssh_client.exec_command(f"find {directory} -type f")

now = datetime.datetime.now()

# Iterate over files and delete files older than the threshold
for file_path in stdout.read().decode().splitlines():
last_modified = datetime.datetime.fromtimestamp(os.path.getmtime(file_path))
age = (now - last_modified).days
if age > threshold:
ssh_client.exec_command(f"rm {file_path}")

ssh_client.close()


@cli.command(
Expand All @@ -78,13 +30,13 @@ def remove_old_files_on_ssh(remote_store: str, threshold: int):
an SSH server.
""",
)
@click.argument("staging_dir", type=str)
@click.argument("remote_store", type=str, envvar="XNAT_INGEST_REMOTE_STORE")
@click.argument("staging_dir", type=str, envvar="XNAT_INGEST_STAGE_DIR")
@click.argument("remote_store", type=str, envvar="XNAT_INGEST_TRANSFER_REMOTE_STORE")
@click.option(
"--store-credentials",
type=click.Path(path_type=Path),
metavar="<access-key> <secret-key>",
envvar="XNAT_INGEST_STORE_CREDENTIALS",
envvar="XNAT_INGEST_TRANSFER_STORE_CREDENTIALS",
default=None,
nargs=2,
help="Credentials to use to access of data stored in remote stores (e.g. AWS S3)",
Expand All @@ -93,7 +45,7 @@ def remove_old_files_on_ssh(remote_store: str, threshold: int):
"--log-level",
default="info",
type=str,
envvar="XNAT_INGEST_LOGLEVEL",
envvar="XNAT_INGEST_TRANSFER_LOGLEVEL",
help=("The level of the logging printed to stdout"),
)
@click.option(
Expand All @@ -102,7 +54,7 @@ def remove_old_files_on_ssh(remote_store: str, threshold: int):
type=LogFile.cli_type,
nargs=2,
metavar="<path> <loglevel>",
envvar="XNAT_INGEST_LOGFILE",
envvar="XNAT_INGEST_TRANSFER_LOGFILE",
help=(
'Location to write the output logs to, defaults to "upload-logs" in the '
"export directory"
Expand All @@ -115,7 +67,7 @@ def remove_old_files_on_ssh(remote_store: str, threshold: int):
nargs=3,
metavar="<address> <loglevel> <subject-preamble>",
multiple=True,
envvar="XNAT_INGEST_LOGEMAIL",
envvar="XNAT_INGEST_TRANSFER_LOGEMAIL",
help=(
"Email(s) to send logs to. When provided in an environment variable, "
"mail and log level are delimited by ',' and separate destinations by ';'"
Expand All @@ -126,7 +78,7 @@ def remove_old_files_on_ssh(remote_store: str, threshold: int):
type=MailServer.cli_type,
metavar="<host> <sender-email> <user> <password>",
default=None,
envvar="XNAT_INGEST_MAILSERVER",
envvar="XNAT_INGEST_TRANSFER_MAILSERVER",
help=(
"the mail server to send logger emails to. When provided in an environment variable, "
"args are delimited by ';'"
Expand All @@ -135,7 +87,7 @@ def remove_old_files_on_ssh(remote_store: str, threshold: int):
@click.option(
"--delete/--dont-delete",
default=False,
envvar="XNAT_INGEST_DELETE",
envvar="XNAT_INGEST_TRANSFER_DELETE",
help="Whether to delete the session directories after they have been uploaded or not",
)
@click.option(
Expand All @@ -151,14 +103,7 @@ def remove_old_files_on_ssh(remote_store: str, threshold: int):
default=None,
metavar="<host> <user> <password>",
help="The XNAT server to upload to plus the user and password to use",
envvar="XNAT_INGEST_XNAT_LOGIN",
)
@click.option(
"--clean-up-older-than",
type=int,
metavar="<days>",
default=0,
help="The number of days to keep files in the remote store for",
envvar="XNAT_INGEST_TRANSFER_XNAT_LOGIN",
)
def transfer(
staging_dir: Path,
Expand All @@ -171,7 +116,6 @@ def transfer(
delete: bool,
raise_errors: bool,
xnat_login: ty.Optional[ty.Tuple[str, str, str]],
clean_up_older_than: int,
):

if not staging_dir.exists():
Expand Down Expand Up @@ -281,23 +225,6 @@ def transfer(
logger.info("Deleting %s after successful upload", session_dir)
shutil.rmtree(session_dir)

if clean_up_older_than:
logger.info(
"Cleaning up files in %s older than %d days",
remote_store,
clean_up_older_than,
)
if store_type == "s3":
remove_old_files_on_s3(
remote_store=remote_store, threshold=clean_up_older_than
)
elif store_type == "ssh":
remove_old_files_on_ssh(
remote_store=remote_store, threshold=clean_up_older_than
)
else:
assert False


if __name__ == "__main__":
transfer()
Loading
Loading