From 5ddf236387bd524182b4b1cb617a87da5b2c36f9 Mon Sep 17 00:00:00 2001 From: Michael Hall Date: Thu, 9 May 2024 15:30:57 -0500 Subject: [PATCH 1/6] - Working state but should be reimplemented --- Dockerfile | 12 +-- README.md | 1 - dmrpp_generator/main.py | 18 +++- main.tf | 4 +- modules/cumulus_ecs_testing/README.md | 33 ++++++ modules/cumulus_ecs_testing/main.tf | 119 ++++++++++++++++++++++ modules/cumulus_ecs_testing/outputs.tf | 3 + modules/cumulus_ecs_testing/variables.tf | 123 +++++++++++++++++++++++ modules/cumulus_ecs_testing/versions.tf | 5 + modules/dmrpp_lambda/variables.tf | 4 - modules/dmrpp_service/main.tf | 5 +- modules/dmrpp_service/variables.tf | 3 + variables.tf | 7 +- 13 files changed, 318 insertions(+), 19 deletions(-) create mode 100755 modules/cumulus_ecs_testing/README.md create mode 100755 modules/cumulus_ecs_testing/main.tf create mode 100755 modules/cumulus_ecs_testing/outputs.tf create mode 100755 modules/cumulus_ecs_testing/variables.tf create mode 100755 modules/cumulus_ecs_testing/versions.tf diff --git a/Dockerfile b/Dockerfile index 4f231c6..af25d46 100644 --- a/Dockerfile +++ b/Dockerfile @@ -27,12 +27,12 @@ COPY tests ./tests/ RUN pip install -r requirements.txt && \ python setup.py install -RUN coverage run -m pytest && \ - coverage report && \ - coverage lcov -o ./coverage/lcov.info && \ - rm -rf tests .coverage .pytest_cache && \ - pip uninstall pytest -y && \ - pip uninstall coverage -y +#RUN coverage run -m pytest && \ +# coverage report && \ +# coverage lcov -o ./coverage/lcov.info && \ +# rm -rf tests .coverage .pytest_cache && \ +# pip uninstall pytest -y && \ +# pip uninstall coverage -y RUN pip install --target $BUILD awslambdaric COPY site.conf /etc/bes/ diff --git a/README.md b/README.md index 090938d..fa97beb 100644 --- a/README.md +++ b/README.md @@ -42,7 +42,6 @@ module "dmrpp-generator" { cluster_arn = module.cumulus.ecs_cluster_arn region = var.region prefix = var.prefix - account_id = var.account_id // Optional Activity Parameters docker_image = "ghrcdaac/dmrpp-generator:" // default to the correct release diff --git a/dmrpp_generator/main.py b/dmrpp_generator/main.py index 26ba157..2eca022 100644 --- a/dmrpp_generator/main.py +++ b/dmrpp_generator/main.py @@ -39,6 +39,7 @@ def __init__(self, **kwargs): super().__init__(**kwargs) self.path = self.path.rstrip('/') + "/" + self.path = '/efs/lambda/' # Enable logging the default is True enable_logging = os.getenv('ENABLE_CW_LOGGING', 'True') in [True, "true", "t", 1] self.dmrpp_version = f"DMRPP {__version__}" @@ -100,6 +101,7 @@ def process(self): Override the processing wrapper :return: """ + self.logger_to_cw.info(f'listdir: {os.listdir("/efs/lambda/")}') collection = self.config.get('collection') collection_files = collection.get('files', []) buckets = self.config.get('buckets') @@ -107,14 +109,21 @@ def process(self): for granule in granules: dmrpp_files = [] for file_ in granule['files']: + self.logger_to_cw.info(f'file: {file_}') if not search(f"{self.processing_regex}$", file_['fileName']): self.logger_to_cw.debug(f"{self.dmrpp_version}: regex {self.processing_regex}" f" does not match filename {file_['fileName']}") continue self.logger_to_cw.debug(f"{self.dmrpp_version}: regex {self.processing_regex}" f" matches filename to process {file_['fileName']}") - input_file_path = file_.get('filename', f's3://{file_["bucket"]}/{file_["key"]}') - output_file_paths = self.dmrpp_generate(input_file=input_file_path, dmrpp_meta=self.dmrpp_meta) + input_file_path = file_.get('fileName', f's3://{file_["bucket"]}/{file_["key"]}') + self.logger_to_cw.info(f'input_file_path: {input_file_path}') + temp = f'/efs/lambda/{input_file_path}' + local = os.path.isfile(temp) + self.logger_to_cw.info(f'local: {local}') + output_file_paths = self.dmrpp_generate( + input_file=temp, local=local, dmrpp_meta=self.dmrpp_meta + ) for output_file_path in output_file_paths: if output_file_path: @@ -127,6 +136,8 @@ def process(self): "type": self.get_file_type(output_file_basename, collection_files), } dmrpp_files.append(dmrpp_file) + upload_location = f's3://{dmrpp_file["bucket"]}/{dmrpp_file["key"]}' + self.logger_to_cw.info(f'upload_location: {upload_location}') self.upload_file_to_s3(output_file_path, f's3://{dmrpp_file["bucket"]}/{dmrpp_file["key"]}') if dmrpp_files == 0: @@ -139,6 +150,9 @@ def process(self): return self.input + def clean_all(self): + self.logger_to_cw.info('Not cleaning') + @staticmethod def strip_old_dmrpp_files(granule): # Remove old dmrpp files if they exist before adding new ones diff --git a/main.tf b/main.tf index 4a2ad99..7cd7e92 100644 --- a/main.tf +++ b/main.tf @@ -11,11 +11,13 @@ module "dmrpp_service" { volumes = var.volumes enable_cw_logging = var.enable_cw_logging get_dmrpp_timeout = var.get_dmrpp_timeout + + efs_fs_id = var.efs_fs_id + access_point_id = var.access_point_id } module "dmrpp_lambda" { source = "./modules/dmrpp_lambda" - account_id = var.account_id region = var.region prefix = var.prefix diff --git a/modules/cumulus_ecs_testing/README.md b/modules/cumulus_ecs_testing/README.md new file mode 100755 index 0000000..8608139 --- /dev/null +++ b/modules/cumulus_ecs_testing/README.md @@ -0,0 +1,33 @@ +# Cumulus - ECS service module + +**Note:** To prevent a race condition during service deletion, make sure to set +depends_on to the related aws_iam_role_policy; otherwise, the policy may be +destroyed too soon and the ECS service will then get stuck in the DRAINING +state. + +## Included resources + +Provides an ECS service and task definition, including autoscaling configuration and Cloudwatch alarms for monitoring. + +## Input variables + +See [variables.tf](./variables.tf) for the input variables to this module and the default values for optional variables. + +## Outputs + +- **service_name** - Name of the created ECS service + +## Example + +```hcl +module "example_ecs_service" { + source = "https://github.com/nasa/cumulus/releases/download/vx.x.x/terraform-aws-cumulus-ecs-service.zip" + + prefix = "my-prefix" + name = "MyServiceName" + + cluster_arn = "arn:aws:ecs:us-east-1:1234567890:cluster/MyECSCluster1" + desired_count = 1 + image = "cumuluss/cumulus-ecs-task:1.9.0" +} +``` diff --git a/modules/cumulus_ecs_testing/main.tf b/modules/cumulus_ecs_testing/main.tf new file mode 100755 index 0000000..051e936 --- /dev/null +++ b/modules/cumulus_ecs_testing/main.tf @@ -0,0 +1,119 @@ +terraform { + required_providers { + aws = { + source = "hashicorp/aws" + version = "~> 5.0" + } + } +} + +locals { + cluster_name = reverse(split("/", var.cluster_arn))[0] + full_name = "${var.prefix}-${var.name}" +} + +data "aws_region" "current" {} + +resource "aws_cloudwatch_log_group" "default" { + name = "${local.full_name}EcsLogs" + retention_in_days = lookup(var.cloudwatch_log_retention_periods, "EcsLogs", var.default_log_retention_days) + tags = var.tags +} + +resource "aws_ecs_task_definition" "default" { + family = local.full_name + network_mode = var.network_mode + + container_definitions = jsonencode([ + { + name = local.full_name + cpu = var.cpu + essential = true +// mountPoints = [for k, v in var.volumes : { +// sourceVolume = v.name, +// containerPath = v.container_path +// }] + mountPoints = [ + { + containerPath = "/efs/" + sourceVolume = "efs_test" + } + ] + privileged = var.privileged + environment = [for k, v in var.environment : { + name = k, + value = v + }] + image = var.image + healthCheck = var.health_check + memoryReservation = var.memory_reservation + command = var.command + logConfiguration = { + logDriver = "awslogs" + options = { + awslogs-group = aws_cloudwatch_log_group.default.name + awslogs-region = data.aws_region.current.name + } + } + } + ]) + + volume { + name = "efs_test" + efs_volume_configuration { + file_system_id = var.efs_fs_id + } + } + + + dynamic "volume" { + for_each = var.volumes + content { + name = volume.value.name + host_path = volume.value.host_path + } + } + + tags = var.tags +} + +resource "aws_cloudwatch_log_subscription_filter" "default" { + count = var.log_destination_arn != null ? 1 : 0 + name = "${local.full_name}-default" + destination_arn = var.log_destination_arn + log_group_name = aws_cloudwatch_log_group.default.name + filter_pattern = "" +} + +resource "aws_ecs_service" "default" { + name = local.full_name + cluster = var.cluster_arn + desired_count = var.desired_count + task_definition = aws_ecs_task_definition.default.arn + deployment_maximum_percent = 100 + deployment_minimum_healthy_percent = 0 + # TODO Re-enable tags once this warning is addressed: + # The new ARN and resource ID format must be enabled to add tags to the + # service. Opt in to the new format and try again. + # + # tags = var.tags +} + +resource "aws_cloudwatch_metric_alarm" "custom" { + for_each = var.alarms + + alarm_description = lookup(each.value, "description", null) + alarm_name = "${local.full_name}-${each.key}" + comparison_operator = each.value.comparison_operator + evaluation_periods = lookup(each.value, "evaluation_periods", 5) + metric_name = each.value.metric_name + statistic = lookup(each.value, "statistic", "Average") + threshold = each.value.threshold + period = lookup(each.value, "period", 60) + namespace = "AWS/ECS" + dimensions = { + ClusterName = local.cluster_name + ServiceName = aws_ecs_service.default.name + } + tags = var.tags +} diff --git a/modules/cumulus_ecs_testing/outputs.tf b/modules/cumulus_ecs_testing/outputs.tf new file mode 100755 index 0000000..67e5593 --- /dev/null +++ b/modules/cumulus_ecs_testing/outputs.tf @@ -0,0 +1,3 @@ +output "service_name" { + value = aws_ecs_service.default.name +} diff --git a/modules/cumulus_ecs_testing/variables.tf b/modules/cumulus_ecs_testing/variables.tf new file mode 100755 index 0000000..711498e --- /dev/null +++ b/modules/cumulus_ecs_testing/variables.tf @@ -0,0 +1,123 @@ +# Required + +variable "cluster_arn" { + description = "ARN of an ECS cluster" + type = string +} + +variable "image" { + description = "Image used to start the container. See https://docs.aws.amazon.com/AmazonECS/latest/APIReference/API_ContainerDefinition.html#ECS-Type-ContainerDefinition-image" + type = string +} + +variable "name" { + description = "ECS service name" + type = string +} + +variable "prefix" { + description = "The unique prefix for your deployment resources" + type = string +} + +# Optional + +variable "alarms" { + description = "Configuration for Cloudwatch alarms to monitor ECS, keyed by alarm name" + type = map(object({ comparison_operator = string, metric_name = string, threshold = number })) + default = {} +} + +variable "cloudwatch_log_retention_periods" { + type = map(number) + description = "retention periods for the respective cloudwatch log group, these values will be used instead of default retention days" + default = {} +} + +variable "default_log_retention_days" { + type = number + default = 30 + description = "default value that user chooses for their log retention periods" +} + +variable "command" { + description = "The command that is passed to the ECS container. Command is concatenated from a list of strings." + type = list(string) + default = null +} + +variable "cpu" { + description = "The number of CPU units the Amazon ECS container agent will reserve for the container" + type = number + default = 10 +} + +variable "desired_count" { + description = "Desired count of ECS cluster instances" + type = number + default = 0 +} + +variable "environment" { + description = "Environment variables to pass to the ECS container" + type = map(string) + default = {} +} + +variable "memory_reservation" { + description = "The soft limit (in MB) of memory to reserve for the container" + type = number + default = 256 +} + +variable "network_mode" { + description = "The Docker networking mode to use for the containers in the task" + type = string + default = "bridge" +} + +variable "privileged" { + description = "When this parameter is true, the container is given elevated privileges on the host container instance (similar to the root user)." + type = bool + default = false +} + +variable "tags" { + description = "Tags to apply to deployed resources" + type = map(string) + default = {} +} + +variable "volumes" { + description = "Volumes to make accessible to the container(s)" + type = list(object({ name = string, host_path = string, container_path = string })) + default = [] +} + +variable "log_destination_arn" { + type = string + default = null + description = "A shared AWS:Log:Destination that receives logs in log_groups" +} + +variable "health_check" { + description = "Health check used by AWS ECS to determine containers health status. See https://docs.aws.amazon.com/AmazonECS/latest/developerguide/task_definition_parameters.html#container_definition_healthcheck" + type = object({ + command = list(string) + interval = number + timeout = number + retries = number + startPeriod = number + }) + default = null +} + +variable "efs_fs_id" { + type = string + default = "" +} + +variable "access_point_id" { + type = string + default = "" +} \ No newline at end of file diff --git a/modules/cumulus_ecs_testing/versions.tf b/modules/cumulus_ecs_testing/versions.tf new file mode 100755 index 0000000..7929e11 --- /dev/null +++ b/modules/cumulus_ecs_testing/versions.tf @@ -0,0 +1,5 @@ +terraform { + required_version = ">= 1.5" +} + + diff --git a/modules/dmrpp_lambda/variables.tf b/modules/dmrpp_lambda/variables.tf index bdeefcf..5450188 100644 --- a/modules/dmrpp_lambda/variables.tf +++ b/modules/dmrpp_lambda/variables.tf @@ -3,10 +3,6 @@ variable "region" { default = "us-west-2" } -variable "account_id" { - type = string -} - variable "prefix" { type = string description = "Cumulus stack prefix" diff --git a/modules/dmrpp_service/main.tf b/modules/dmrpp_service/main.tf index de3543f..949b6a8 100644 --- a/modules/dmrpp_service/main.tf +++ b/modules/dmrpp_service/main.tf @@ -10,7 +10,10 @@ module "dmrpp_ecs_task_module" { } module "dmrpp_service" { - source = "https://github.com/nasa/cumulus/releases/download/v18.2.0/terraform-aws-cumulus-ecs-service.zip" +// source = "https://github.com/nasa/cumulus/releases/download/v18.2.0/terraform-aws-cumulus-ecs-service.zip" + source = "../cumulus_ecs_testing" + efs_fs_id = var.efs_fs_id + access_point_id = var.access_point_id prefix = var.prefix name = "dmrpp_generator" diff --git a/modules/dmrpp_service/variables.tf b/modules/dmrpp_service/variables.tf index ddf5c20..426ab5d 100644 --- a/modules/dmrpp_service/variables.tf +++ b/modules/dmrpp_service/variables.tf @@ -15,3 +15,6 @@ variable "default_log_retention_days" { default = 30 description = "Default value that user chooses for their log retention periods" } + +variable "efs_fs_id" {} +variable "access_point_id" {} \ No newline at end of file diff --git a/variables.tf b/variables.tf index 1aca3ad..864ed02 100644 --- a/variables.tf +++ b/variables.tf @@ -3,10 +3,6 @@ variable "aws_profile" { default = null } -variable "account_id" { - type = string -} - variable "region" { type = string default = "us-west-2" @@ -86,3 +82,6 @@ variable "ephemeral_storage" { description = "Lambda /tmp storage limit" default = 512 } + +variable "efs_fs_id" {} +variable "access_point_id" {} From 31a5365e292a256b5df9fd00fdbc84847ea9a28a Mon Sep 17 00:00:00 2001 From: Michael Hall Date: Fri, 31 May 2024 11:22:03 -0500 Subject: [PATCH 2/6] - Broken last state --- dmrpp_generator/main.py | 13 ++++++-- generate_dmrpp.py | 66 ++++++++++++++++++++++++++++++++++++++++- 2 files changed, 75 insertions(+), 4 deletions(-) diff --git a/dmrpp_generator/main.py b/dmrpp_generator/main.py index 2eca022..7172acd 100644 --- a/dmrpp_generator/main.py +++ b/dmrpp_generator/main.py @@ -1,3 +1,4 @@ +import json import logging import os import time @@ -39,7 +40,7 @@ def __init__(self, **kwargs): super().__init__(**kwargs) self.path = self.path.rstrip('/') + "/" - self.path = '/efs/lambda/' + self.path = '/mnt/ebs_test/' # Enable logging the default is True enable_logging = os.getenv('ENABLE_CW_LOGGING', 'True') in [True, "true", "t", 1] self.dmrpp_version = f"DMRPP {__version__}" @@ -101,11 +102,17 @@ def process(self): Override the processing wrapper :return: """ - self.logger_to_cw.info(f'listdir: {os.listdir("/efs/lambda/")}') + local_store = os.getenv('EBS_MNT') + with open(f'{local_store}/gdg_out.json', 'r') as output: + contents = json.load(output) + self.input = {'granules': contents.get('granules')} + + # self.logger_to_cw.info(f'listdir: {os.listdir("/efs/lambda/")}') collection = self.config.get('collection') collection_files = collection.get('files', []) buckets = self.config.get('buckets') granules = self.input['granules'] + print(f'processing {len(granules)} files...') for granule in granules: dmrpp_files = [] for file_ in granule['files']: @@ -241,7 +248,7 @@ def dmrpp_generate(self, input_file, local=False, dmrpp_meta=None, args=None): def main(event, context): print('main event') - print(event) + # print(event) return DMRPPGenerator(**event).process() diff --git a/generate_dmrpp.py b/generate_dmrpp.py index 17538ae..69dc93a 100644 --- a/generate_dmrpp.py +++ b/generate_dmrpp.py @@ -1,3 +1,7 @@ +import os +import signal +import sys +import time from json import JSONDecodeError from os import listdir, getenv from os.path import isfile, join, basename @@ -32,6 +36,18 @@ def try_json_decode(key, required_type): return os_var +class GracefulKiller: + kill_now = False + + def __init__(self): + signal.signal(signal.SIGINT, self.exit_gracefully) + signal.signal(signal.SIGTERM, self.exit_gracefully) + + def exit_gracefully(self, signum, frame): + print('Exiting gracefully') + self.kill_now = True + + def main(): meta = try_json_decode('PAYLOAD', {}) args = try_json_decode('DMRPP_ARGS', []) @@ -47,6 +63,54 @@ def main(): logger.info(f'Generated: {out_files}') +def handler(event, context): + # print(f'EVENT: {event}') + meta = event.get('config').get('collection').get('meta', {}) + args = [] + workstation_path = getenv('/efs_test/rssmif17d3d__7', '/usr/share/hyrax/') + # join_path = lambda x: join(workstation_path, x) + # input_files = [join_path(f) for f in listdir(workstation_path) if isfile(join_path(f))] + + input_files = [] + local_store = os.getenv('EBS_MNT') + for file in os.listdir(local_store): + if file.endswith('.nc'): + input_files.append(f'{local_store}/{file}') + print(f'input_files: {input_files}') + dmrpp = DMRPPGenerator(input=input_files) + dmrpp.path = local_store + dmrpp.processing_regex = meta.get('dmrpp_regex', dmrpp.processing_regex) + for input_file in input_files: + if search(f"{dmrpp.processing_regex}", basename(input_file)): + out_files = dmrpp.dmrpp_generate(input_file, local=True, dmrpp_meta=meta, args=args) + logger.info(f'Generated: {out_files}') + else: + logger.info(f'{dmrpp.processing_regex} did not match {input_file}') + return 0 + + if __name__ == "__main__": - main() + print(f'GDG argv: {sys.argv}') + if len(sys.argv) <= 1: + killer = GracefulKiller() + print('GDG Task is running...') + while not killer.kill_now: + time.sleep(1) + print('terminating') + else: + print('GDG calling function') + print(f'argv: {type(sys.argv[1])}') + print(f'argv: {sys.argv[1]}') + ret = handler(json.loads(sys.argv[1]), {}) + + # efs_dir = '/efs_test' + # # efs_dir = '/ebs_test' + # print(f'is {efs_dir} a directory: {os.path.isdir(efs_dir)}') + # filename = f'{efs_dir}/gdg_out.json' + # print(f'Creating file: {filename}') + # with open(filename, 'w+') as test_file: + # test_file.write(json.dumps(ret)) + # print(f'{efs_dir} contents: {os.listdir("/efs_test")}') + + # main() pass From da58f47955d67f2ff8e3bd33e3fd4c375a0b6039 Mon Sep 17 00:00:00 2001 From: Michael Hall Date: Mon, 24 Jun 2024 10:43:02 -0500 Subject: [PATCH 3/6] - Latest state --- generate_dmrpp.py | 27 ++++++++++++++------------- 1 file changed, 14 insertions(+), 13 deletions(-) diff --git a/generate_dmrpp.py b/generate_dmrpp.py index 69dc93a..d51589e 100644 --- a/generate_dmrpp.py +++ b/generate_dmrpp.py @@ -65,7 +65,8 @@ def main(): def handler(event, context): # print(f'EVENT: {event}') - meta = event.get('config').get('collection').get('meta', {}) + collection = event.get('config').get('collection') + meta = collection.get('meta', {}) args = [] workstation_path = getenv('/efs_test/rssmif17d3d__7', '/usr/share/hyrax/') # join_path = lambda x: join(workstation_path, x) @@ -73,9 +74,18 @@ def handler(event, context): input_files = [] local_store = os.getenv('EBS_MNT') - for file in os.listdir(local_store): - if file.endswith('.nc'): - input_files.append(f'{local_store}/{file}') + c_id = f'{collection.get("name")}__{collection.get("version")}' + collection_store = f'{local_store}/{c_id}' + + with open(f'{collection_store}/{c_id}.json', 'r') as output: + contents = json.load(output) + print(f'Granule Count: {len(contents.get("granules"))}') + granules = {'granules': contents.get('granules')} + + for granule in granules.get('granules'): + for file in granule.get('files'): + input_files.append(f'{local_store}/{file.get("name")}') + print(f'input_files: {input_files}') dmrpp = DMRPPGenerator(input=input_files) dmrpp.path = local_store @@ -103,14 +113,5 @@ def handler(event, context): print(f'argv: {sys.argv[1]}') ret = handler(json.loads(sys.argv[1]), {}) - # efs_dir = '/efs_test' - # # efs_dir = '/ebs_test' - # print(f'is {efs_dir} a directory: {os.path.isdir(efs_dir)}') - # filename = f'{efs_dir}/gdg_out.json' - # print(f'Creating file: {filename}') - # with open(filename, 'w+') as test_file: - # test_file.write(json.dumps(ret)) - # print(f'{efs_dir} contents: {os.listdir("/efs_test")}') - # main() pass From 019426ebfa63b28c6491d77fafc930d70192deed Mon Sep 17 00:00:00 2001 From: Michael Hall Date: Wed, 18 Sep 2024 16:14:14 -0500 Subject: [PATCH 4/6] temp message --- Dockerfile | 1 + dmrpp_generator/main.py | 69 +++++++++++++++++++++++++++++++++++++---- generate_dmrpp.py | 67 +-------------------------------------- 3 files changed, 65 insertions(+), 72 deletions(-) diff --git a/Dockerfile b/Dockerfile index af25d46..dbc5819 100644 --- a/Dockerfile +++ b/Dockerfile @@ -23,6 +23,7 @@ RUN pip install ipython && \ COPY setup.py requirements*txt generate_dmrpp.py ./ COPY dmrpp_generator ./dmrpp_generator/ +COPY dmrpp_generator/handler.py ./dmrpp_generator/ COPY tests ./tests/ RUN pip install -r requirements.txt && \ python setup.py install diff --git a/dmrpp_generator/main.py b/dmrpp_generator/main.py index 7172acd..5f2ad5b 100644 --- a/dmrpp_generator/main.py +++ b/dmrpp_generator/main.py @@ -1,6 +1,8 @@ import json import logging import os +import re +import shutil import time from re import search import subprocess @@ -28,6 +30,7 @@ class DMRPPGenerator(Process): """ def __init__(self, **kwargs): + print(f'KWARGS: {kwargs}') config = kwargs.get('config', {}) # any keys on collection config override keys from workflow config self.dmrpp_meta = { @@ -35,12 +38,12 @@ def __init__(self, **kwargs): **config.get('collection', {}).get('meta', {}).get('dmrpp', {}), # from collection } self.processing_regex = self.dmrpp_meta.get( - 'dmrpp_regex', '.*\\.(((?i:(h|hdf)))(e)?5|nc(4)?)(\\.bz2|\\.gz|\\.Z)?' + 'dmrpp_regex', '.*(?i:nc)$' ) super().__init__(**kwargs) - self.path = self.path.rstrip('/') + "/" - self.path = '/mnt/ebs_test/' + # self.path = self.path.rstrip('/') + "/" + # self.path = '/mnt/ebs_test/' # Enable logging the default is True enable_logging = os.getenv('ENABLE_CW_LOGGING', 'True') in [True, "true", "t", 1] self.dmrpp_version = f"DMRPP {__version__}" @@ -98,6 +101,60 @@ def upload_file_to_s3(self, filename, uri): return s3.upload(filename, uri, extra={}) def process(self): + if 'EBS_MNT' in os.environ: + print('Using DAAC Split Processing') + ret = self.process_dmrpp_ebs() + else: + print('Using Cumulus Processing') + ret = self.process_cumulus() + + return ret + + def process_dmrpp_ebs(self): + collection = self.config.get('collection') + local_store = os.getenv('EBS_MNT') + c_id = f'{collection.get("name")}__{collection.get("version")}' + collection_store = f'{local_store}/{c_id}' + event_file = f'{collection_store}/{c_id}.json' + with open(event_file, 'r') as output: + contents = json.load(output) + print(f'Granule Count: {len(contents.get("granules"))}') + granules = {'granules': contents.get('granules')} + + for granule in granules.get('granules'): + dmrpp_files = [] + for file in granule.get('files'): + filename = file.get('name') + if not re.search(self.processing_regex, filename): + continue + else: + print(f'regex {self.processing_regex} matched file {filename}: {re.search(self.processing_regex, filename).group()}') + src = f'{collection_store}/{filename}' + dst = f'{self.path}/{filename}' + print(f'Copying: {src} -> {dst}') + shutil.copy(src, dst) + # file_path = f'{collection_store}/{filename}' + dmrpp_files = self.dmrpp_generate(dst, True, self.dmrpp_meta) + + for dmrpp in dmrpp_files: + dest = f'{collection_store}/{os.path.basename(dmrpp)}' + print(f'Copying: {dmrpp} -> {dest}') + shutil.copy(dmrpp, dest) + os.remove(dmrpp) + granule.get('files').append({ + 'name': os.path.basename(dest), + 'path': os.path.dirname(dest), + 'size': os.path.getsize(dest) + }) + + shutil.move(event_file, f'{event_file}.dmrpp.in') + with open(event_file, 'w+') as file: + file.write(json.dumps(granules)) + + print('DMR++ processing completed.') + return {"granules": granules, "input": self.output} + + def process_cumulus(self): """ Override the processing wrapper :return: @@ -247,9 +304,9 @@ def dmrpp_generate(self, input_file, local=False, dmrpp_meta=None, args=None): def main(event, context): - print('main event') - # print(event) - return DMRPPGenerator(**event).process() + # print(f'DMRPP main event: {event}') + kwargs = {'input': event.get('input'), 'config': event.get('config')} + return DMRPPGenerator(**kwargs).process() if __name__ == "__main__": diff --git a/generate_dmrpp.py b/generate_dmrpp.py index d51589e..17538ae 100644 --- a/generate_dmrpp.py +++ b/generate_dmrpp.py @@ -1,7 +1,3 @@ -import os -import signal -import sys -import time from json import JSONDecodeError from os import listdir, getenv from os.path import isfile, join, basename @@ -36,18 +32,6 @@ def try_json_decode(key, required_type): return os_var -class GracefulKiller: - kill_now = False - - def __init__(self): - signal.signal(signal.SIGINT, self.exit_gracefully) - signal.signal(signal.SIGTERM, self.exit_gracefully) - - def exit_gracefully(self, signum, frame): - print('Exiting gracefully') - self.kill_now = True - - def main(): meta = try_json_decode('PAYLOAD', {}) args = try_json_decode('DMRPP_ARGS', []) @@ -63,55 +47,6 @@ def main(): logger.info(f'Generated: {out_files}') -def handler(event, context): - # print(f'EVENT: {event}') - collection = event.get('config').get('collection') - meta = collection.get('meta', {}) - args = [] - workstation_path = getenv('/efs_test/rssmif17d3d__7', '/usr/share/hyrax/') - # join_path = lambda x: join(workstation_path, x) - # input_files = [join_path(f) for f in listdir(workstation_path) if isfile(join_path(f))] - - input_files = [] - local_store = os.getenv('EBS_MNT') - c_id = f'{collection.get("name")}__{collection.get("version")}' - collection_store = f'{local_store}/{c_id}' - - with open(f'{collection_store}/{c_id}.json', 'r') as output: - contents = json.load(output) - print(f'Granule Count: {len(contents.get("granules"))}') - granules = {'granules': contents.get('granules')} - - for granule in granules.get('granules'): - for file in granule.get('files'): - input_files.append(f'{local_store}/{file.get("name")}') - - print(f'input_files: {input_files}') - dmrpp = DMRPPGenerator(input=input_files) - dmrpp.path = local_store - dmrpp.processing_regex = meta.get('dmrpp_regex', dmrpp.processing_regex) - for input_file in input_files: - if search(f"{dmrpp.processing_regex}", basename(input_file)): - out_files = dmrpp.dmrpp_generate(input_file, local=True, dmrpp_meta=meta, args=args) - logger.info(f'Generated: {out_files}') - else: - logger.info(f'{dmrpp.processing_regex} did not match {input_file}') - return 0 - - if __name__ == "__main__": - print(f'GDG argv: {sys.argv}') - if len(sys.argv) <= 1: - killer = GracefulKiller() - print('GDG Task is running...') - while not killer.kill_now: - time.sleep(1) - print('terminating') - else: - print('GDG calling function') - print(f'argv: {type(sys.argv[1])}') - print(f'argv: {sys.argv[1]}') - ret = handler(json.loads(sys.argv[1]), {}) - - # main() + main() pass From 8fac727de674e043e14f7aa962f6c8aab5844ba0 Mon Sep 17 00:00:00 2001 From: Michael Hall Date: Thu, 17 Oct 2024 14:24:01 -0500 Subject: [PATCH 5/6] - Cleaned up implementation --- dmrpp_generator/main.py | 23 ++++++----------------- 1 file changed, 6 insertions(+), 17 deletions(-) diff --git a/dmrpp_generator/main.py b/dmrpp_generator/main.py index 3096ea8..2bd3f1b 100644 --- a/dmrpp_generator/main.py +++ b/dmrpp_generator/main.py @@ -127,12 +127,12 @@ def process_dmrpp_ebs(self): for granule in granules.get('granules'): dmrpp_files = [] for file in granule.get('files'): - filename = file.get('name') + filename = file.get('fileName') if not re.search(self.processing_regex, filename): continue else: print(f'regex {self.processing_regex} matched file {filename}: {re.search(self.processing_regex, filename).group()}') - src = f'{collection_store}/{filename}' + src = file.get('key') dst = f'{self.path}/{filename}' print(f'Copying: {src} -> {dst}') shutil.copy(src, dst) @@ -145,8 +145,8 @@ def process_dmrpp_ebs(self): shutil.copy(dmrpp, dest) os.remove(dmrpp) granule.get('files').append({ - 'name': os.path.basename(dest), - 'path': os.path.dirname(dest), + 'fileName': os.path.basename(dest), + 'key': dest, 'size': os.path.getsize(dest) }) @@ -162,12 +162,6 @@ def process_cumulus(self): Override the processing wrapper :return: """ - local_store = os.getenv('EBS_MNT') - with open(f'{local_store}/gdg_out.json', 'r') as output: - contents = json.load(output) - self.input = {'granules': contents.get('granules')} - - # self.logger_to_cw.info(f'listdir: {os.listdir("/efs/lambda/")}') collection = self.config.get('collection') collection_files = collection.get('files', []) buckets = self.config.get('buckets') @@ -184,14 +178,9 @@ def process_cumulus(self): continue self.logger_to_cw.debug(f"{self.dmrpp_version}: regex {self.processing_regex}" f" matches filename to process {file_['fileName']}") - input_file_path = file_.get('fileName', f's3://{file_["bucket"]}/{file_["key"]}') + input_file_path = f's3://{file_["bucket"]}/{file_["key"]}' self.logger_to_cw.info(f'input_file_path: {input_file_path}') - temp = f'/efs/lambda/{input_file_path}' - local = os.path.isfile(temp) - self.logger_to_cw.info(f'local: {local}') - output_file_paths = self.dmrpp_generate( - input_file=temp, local=local, dmrpp_meta=self.dmrpp_meta - ) + output_file_paths = self.dmrpp_generate(input_file=input_file_path, dmrpp_meta=self.dmrpp_meta) if not output_generated and len(output_file_paths) > 0: output_generated = True From 7d3a4836c1ffc65a4dfaf38220174a69ff7046ea Mon Sep 17 00:00:00 2001 From: Michael Hall Date: Fri, 18 Oct 2024 09:51:56 -0500 Subject: [PATCH 6/6] Removing temporary ecs task, efs mount point config, and reenabling tests. --- Dockerfile | 12 +-- dmrpp_generator/handler.py | 33 ++++++ dmrpp_generator/main.py | 18 +--- main.tf | 3 - modules/cumulus_ecs_testing/README.md | 33 ------ modules/cumulus_ecs_testing/main.tf | 119 ---------------------- modules/cumulus_ecs_testing/outputs.tf | 3 - modules/cumulus_ecs_testing/variables.tf | 123 ----------------------- modules/cumulus_ecs_testing/versions.tf | 5 - modules/dmrpp_service/main.tf | 5 +- modules/dmrpp_service/variables.tf | 3 - variables.tf | 3 - 12 files changed, 44 insertions(+), 316 deletions(-) create mode 100644 dmrpp_generator/handler.py delete mode 100755 modules/cumulus_ecs_testing/README.md delete mode 100755 modules/cumulus_ecs_testing/main.tf delete mode 100755 modules/cumulus_ecs_testing/outputs.tf delete mode 100755 modules/cumulus_ecs_testing/variables.tf delete mode 100755 modules/cumulus_ecs_testing/versions.tf diff --git a/Dockerfile b/Dockerfile index fc149b4..9bb8609 100644 --- a/Dockerfile +++ b/Dockerfile @@ -28,12 +28,12 @@ COPY tests ./tests/ RUN pip install -r requirements.txt && \ python setup.py install -#RUN coverage run -m pytest && \ -# coverage report && \ -# coverage lcov -o ./coverage/lcov.info && \ -# rm -rf tests .coverage .pytest_cache && \ -# pip uninstall pytest -y && \ -# pip uninstall coverage -y +RUN coverage run -m pytest && \ + coverage report && \ + coverage lcov -o ./coverage/lcov.info && \ + rm -rf tests .coverage .pytest_cache && \ + pip uninstall pytest -y && \ + pip uninstall coverage -y RUN pip install --target $BUILD awslambdaric COPY site.conf /etc/bes/ diff --git a/dmrpp_generator/handler.py b/dmrpp_generator/handler.py new file mode 100644 index 0000000..df7f6cd --- /dev/null +++ b/dmrpp_generator/handler.py @@ -0,0 +1,33 @@ +import json +import signal +import sys +import time + +from dmrpp_generator.main import main + + +class GracefulKiller: + kill_now = False + + def __init__(self): + signal.signal(signal.SIGINT, self.exit_gracefully) + signal.signal(signal.SIGTERM, self.exit_gracefully) + + def exit_gracefully(self, signum, frame): + print('Exiting gracefully') + self.kill_now = True + + +if __name__ == "__main__": + print(f'DMR++ argv: {sys.argv}') + if len(sys.argv) <= 1: + killer = GracefulKiller() + print('DMR++ Task is running...') + while not killer.kill_now: + time.sleep(1) + print('terminating') + else: + print('DMR++ calling function') + print(f'argv: {type(sys.argv[1])}') + print(f'argv: {sys.argv[1]}') + main(json.loads(sys.argv[1]), {}) diff --git a/dmrpp_generator/main.py b/dmrpp_generator/main.py index 2bd3f1b..51f197a 100644 --- a/dmrpp_generator/main.py +++ b/dmrpp_generator/main.py @@ -30,7 +30,6 @@ class DMRPPGenerator(Process): """ def __init__(self, **kwargs): - print(f'KWARGS: {kwargs}') config = kwargs.get('config', {}) # any keys on collection config override keys from workflow config self.dmrpp_meta = { @@ -38,12 +37,11 @@ def __init__(self, **kwargs): **config.get('collection', {}).get('meta', {}).get('dmrpp', {}), # from collection } self.processing_regex = self.dmrpp_meta.get( - 'dmrpp_regex', '.*(?i:nc)$' + 'dmrpp_regex', '.*\\.(((?i:(h|hdf)))(e)?5|nc(4)?)(\\.bz2|\\.gz|\\.Z)?$' ) super().__init__(**kwargs) - # self.path = self.path.rstrip('/') + "/" - # self.path = '/mnt/ebs_test/' + self.path = self.path.rstrip('/') + "/" # Enable logging the default is True enable_logging = (os.getenv('ENABLE_CW_LOGGING', 'true').lower() == 'true') self.dmrpp_version = f"DMRPP {__version__}" @@ -133,10 +131,9 @@ def process_dmrpp_ebs(self): else: print(f'regex {self.processing_regex} matched file {filename}: {re.search(self.processing_regex, filename).group()}') src = file.get('key') - dst = f'{self.path}/{filename}' + dst = f'{self.path}{filename}' print(f'Copying: {src} -> {dst}') shutil.copy(src, dst) - # file_path = f'{collection_store}/{filename}' dmrpp_files = self.dmrpp_generate(dst, True, self.dmrpp_meta) for dmrpp in dmrpp_files: @@ -179,7 +176,6 @@ def process_cumulus(self): self.logger_to_cw.debug(f"{self.dmrpp_version}: regex {self.processing_regex}" f" matches filename to process {file_['fileName']}") input_file_path = f's3://{file_["bucket"]}/{file_["key"]}' - self.logger_to_cw.info(f'input_file_path: {input_file_path}') output_file_paths = self.dmrpp_generate(input_file=input_file_path, dmrpp_meta=self.dmrpp_meta) if not output_generated and len(output_file_paths) > 0: @@ -210,9 +206,7 @@ def process_cumulus(self): raise Exception('No dmrpp files were produced and verify_output was enabled.') return self.input - - def clean_all(self): - self.logger_to_cw.info('Not cleaning') + @staticmethod def strip_old_dmrpp_files(granule): @@ -287,10 +281,6 @@ def dmrpp_generate(self, input_file, local=False, dmrpp_meta=None, args=None): def main(event, context): - # print(f'DMRPP main event: {event}') - # kwargs = {'input': event.get('input'), 'config': event.get('config')} - # return DMRPPGenerator(**kwargs).process() - dmrpp = DMRPPGenerator(**event) try: ret = dmrpp.process() diff --git a/main.tf b/main.tf index 53b53d5..02b2641 100644 --- a/main.tf +++ b/main.tf @@ -11,9 +11,6 @@ module "dmrpp_service" { volumes = var.volumes enable_cw_logging = var.enable_cw_logging get_dmrpp_timeout = var.get_dmrpp_timeout - - efs_fs_id = var.efs_fs_id - access_point_id = var.access_point_id } module "dmrpp_lambda" { diff --git a/modules/cumulus_ecs_testing/README.md b/modules/cumulus_ecs_testing/README.md deleted file mode 100755 index 8608139..0000000 --- a/modules/cumulus_ecs_testing/README.md +++ /dev/null @@ -1,33 +0,0 @@ -# Cumulus - ECS service module - -**Note:** To prevent a race condition during service deletion, make sure to set -depends_on to the related aws_iam_role_policy; otherwise, the policy may be -destroyed too soon and the ECS service will then get stuck in the DRAINING -state. - -## Included resources - -Provides an ECS service and task definition, including autoscaling configuration and Cloudwatch alarms for monitoring. - -## Input variables - -See [variables.tf](./variables.tf) for the input variables to this module and the default values for optional variables. - -## Outputs - -- **service_name** - Name of the created ECS service - -## Example - -```hcl -module "example_ecs_service" { - source = "https://github.com/nasa/cumulus/releases/download/vx.x.x/terraform-aws-cumulus-ecs-service.zip" - - prefix = "my-prefix" - name = "MyServiceName" - - cluster_arn = "arn:aws:ecs:us-east-1:1234567890:cluster/MyECSCluster1" - desired_count = 1 - image = "cumuluss/cumulus-ecs-task:1.9.0" -} -``` diff --git a/modules/cumulus_ecs_testing/main.tf b/modules/cumulus_ecs_testing/main.tf deleted file mode 100755 index 051e936..0000000 --- a/modules/cumulus_ecs_testing/main.tf +++ /dev/null @@ -1,119 +0,0 @@ -terraform { - required_providers { - aws = { - source = "hashicorp/aws" - version = "~> 5.0" - } - } -} - -locals { - cluster_name = reverse(split("/", var.cluster_arn))[0] - full_name = "${var.prefix}-${var.name}" -} - -data "aws_region" "current" {} - -resource "aws_cloudwatch_log_group" "default" { - name = "${local.full_name}EcsLogs" - retention_in_days = lookup(var.cloudwatch_log_retention_periods, "EcsLogs", var.default_log_retention_days) - tags = var.tags -} - -resource "aws_ecs_task_definition" "default" { - family = local.full_name - network_mode = var.network_mode - - container_definitions = jsonencode([ - { - name = local.full_name - cpu = var.cpu - essential = true -// mountPoints = [for k, v in var.volumes : { -// sourceVolume = v.name, -// containerPath = v.container_path -// }] - mountPoints = [ - { - containerPath = "/efs/" - sourceVolume = "efs_test" - } - ] - privileged = var.privileged - environment = [for k, v in var.environment : { - name = k, - value = v - }] - image = var.image - healthCheck = var.health_check - memoryReservation = var.memory_reservation - command = var.command - logConfiguration = { - logDriver = "awslogs" - options = { - awslogs-group = aws_cloudwatch_log_group.default.name - awslogs-region = data.aws_region.current.name - } - } - } - ]) - - volume { - name = "efs_test" - efs_volume_configuration { - file_system_id = var.efs_fs_id - } - } - - - dynamic "volume" { - for_each = var.volumes - content { - name = volume.value.name - host_path = volume.value.host_path - } - } - - tags = var.tags -} - -resource "aws_cloudwatch_log_subscription_filter" "default" { - count = var.log_destination_arn != null ? 1 : 0 - name = "${local.full_name}-default" - destination_arn = var.log_destination_arn - log_group_name = aws_cloudwatch_log_group.default.name - filter_pattern = "" -} - -resource "aws_ecs_service" "default" { - name = local.full_name - cluster = var.cluster_arn - desired_count = var.desired_count - task_definition = aws_ecs_task_definition.default.arn - deployment_maximum_percent = 100 - deployment_minimum_healthy_percent = 0 - # TODO Re-enable tags once this warning is addressed: - # The new ARN and resource ID format must be enabled to add tags to the - # service. Opt in to the new format and try again. - # - # tags = var.tags -} - -resource "aws_cloudwatch_metric_alarm" "custom" { - for_each = var.alarms - - alarm_description = lookup(each.value, "description", null) - alarm_name = "${local.full_name}-${each.key}" - comparison_operator = each.value.comparison_operator - evaluation_periods = lookup(each.value, "evaluation_periods", 5) - metric_name = each.value.metric_name - statistic = lookup(each.value, "statistic", "Average") - threshold = each.value.threshold - period = lookup(each.value, "period", 60) - namespace = "AWS/ECS" - dimensions = { - ClusterName = local.cluster_name - ServiceName = aws_ecs_service.default.name - } - tags = var.tags -} diff --git a/modules/cumulus_ecs_testing/outputs.tf b/modules/cumulus_ecs_testing/outputs.tf deleted file mode 100755 index 67e5593..0000000 --- a/modules/cumulus_ecs_testing/outputs.tf +++ /dev/null @@ -1,3 +0,0 @@ -output "service_name" { - value = aws_ecs_service.default.name -} diff --git a/modules/cumulus_ecs_testing/variables.tf b/modules/cumulus_ecs_testing/variables.tf deleted file mode 100755 index 711498e..0000000 --- a/modules/cumulus_ecs_testing/variables.tf +++ /dev/null @@ -1,123 +0,0 @@ -# Required - -variable "cluster_arn" { - description = "ARN of an ECS cluster" - type = string -} - -variable "image" { - description = "Image used to start the container. See https://docs.aws.amazon.com/AmazonECS/latest/APIReference/API_ContainerDefinition.html#ECS-Type-ContainerDefinition-image" - type = string -} - -variable "name" { - description = "ECS service name" - type = string -} - -variable "prefix" { - description = "The unique prefix for your deployment resources" - type = string -} - -# Optional - -variable "alarms" { - description = "Configuration for Cloudwatch alarms to monitor ECS, keyed by alarm name" - type = map(object({ comparison_operator = string, metric_name = string, threshold = number })) - default = {} -} - -variable "cloudwatch_log_retention_periods" { - type = map(number) - description = "retention periods for the respective cloudwatch log group, these values will be used instead of default retention days" - default = {} -} - -variable "default_log_retention_days" { - type = number - default = 30 - description = "default value that user chooses for their log retention periods" -} - -variable "command" { - description = "The command that is passed to the ECS container. Command is concatenated from a list of strings." - type = list(string) - default = null -} - -variable "cpu" { - description = "The number of CPU units the Amazon ECS container agent will reserve for the container" - type = number - default = 10 -} - -variable "desired_count" { - description = "Desired count of ECS cluster instances" - type = number - default = 0 -} - -variable "environment" { - description = "Environment variables to pass to the ECS container" - type = map(string) - default = {} -} - -variable "memory_reservation" { - description = "The soft limit (in MB) of memory to reserve for the container" - type = number - default = 256 -} - -variable "network_mode" { - description = "The Docker networking mode to use for the containers in the task" - type = string - default = "bridge" -} - -variable "privileged" { - description = "When this parameter is true, the container is given elevated privileges on the host container instance (similar to the root user)." - type = bool - default = false -} - -variable "tags" { - description = "Tags to apply to deployed resources" - type = map(string) - default = {} -} - -variable "volumes" { - description = "Volumes to make accessible to the container(s)" - type = list(object({ name = string, host_path = string, container_path = string })) - default = [] -} - -variable "log_destination_arn" { - type = string - default = null - description = "A shared AWS:Log:Destination that receives logs in log_groups" -} - -variable "health_check" { - description = "Health check used by AWS ECS to determine containers health status. See https://docs.aws.amazon.com/AmazonECS/latest/developerguide/task_definition_parameters.html#container_definition_healthcheck" - type = object({ - command = list(string) - interval = number - timeout = number - retries = number - startPeriod = number - }) - default = null -} - -variable "efs_fs_id" { - type = string - default = "" -} - -variable "access_point_id" { - type = string - default = "" -} \ No newline at end of file diff --git a/modules/cumulus_ecs_testing/versions.tf b/modules/cumulus_ecs_testing/versions.tf deleted file mode 100755 index 7929e11..0000000 --- a/modules/cumulus_ecs_testing/versions.tf +++ /dev/null @@ -1,5 +0,0 @@ -terraform { - required_version = ">= 1.5" -} - - diff --git a/modules/dmrpp_service/main.tf b/modules/dmrpp_service/main.tf index 949b6a8..4fb6a68 100644 --- a/modules/dmrpp_service/main.tf +++ b/modules/dmrpp_service/main.tf @@ -10,10 +10,7 @@ module "dmrpp_ecs_task_module" { } module "dmrpp_service" { -// source = "https://github.com/nasa/cumulus/releases/download/v18.2.0/terraform-aws-cumulus-ecs-service.zip" - source = "../cumulus_ecs_testing" - efs_fs_id = var.efs_fs_id - access_point_id = var.access_point_id + source = "https://github.com/nasa/cumulus/releases/download/v18.4.0/terraform-aws-cumulus-ecs-service.zip" prefix = var.prefix name = "dmrpp_generator" diff --git a/modules/dmrpp_service/variables.tf b/modules/dmrpp_service/variables.tf index 426ab5d..ddf5c20 100644 --- a/modules/dmrpp_service/variables.tf +++ b/modules/dmrpp_service/variables.tf @@ -15,6 +15,3 @@ variable "default_log_retention_days" { default = 30 description = "Default value that user chooses for their log retention periods" } - -variable "efs_fs_id" {} -variable "access_point_id" {} \ No newline at end of file diff --git a/variables.tf b/variables.tf index 864ed02..d5b7f78 100644 --- a/variables.tf +++ b/variables.tf @@ -82,6 +82,3 @@ variable "ephemeral_storage" { description = "Lambda /tmp storage limit" default = 512 } - -variable "efs_fs_id" {} -variable "access_point_id" {}