Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Mlh0079 cloud ext #73

Merged
merged 7 commits into from
Oct 18, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 6 additions & 5 deletions Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -23,16 +23,17 @@ RUN pip install ipython && \

COPY setup.py requirements*txt generate_dmrpp.py ./
COPY dmrpp_generator ./dmrpp_generator/
COPY dmrpp_generator/handler.py ./dmrpp_generator/
COPY tests ./tests/
RUN pip install -r requirements.txt && \
python setup.py install

RUN coverage run -m pytest && \
coverage report && \
coverage lcov -o ./coverage/lcov.info && \
rm -rf tests .coverage .pytest_cache && \
pip uninstall pytest -y && \
pip uninstall coverage -y
coverage report && \
coverage lcov -o ./coverage/lcov.info && \
rm -rf tests .coverage .pytest_cache && \
pip uninstall pytest -y && \
pip uninstall coverage -y

RUN pip install --target $BUILD awslambdaric
COPY site.conf /etc/bes/
Expand Down
33 changes: 33 additions & 0 deletions dmrpp_generator/handler.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
import json
import signal
import sys
import time

from dmrpp_generator.main import main


class GracefulKiller:
kill_now = False

def __init__(self):
signal.signal(signal.SIGINT, self.exit_gracefully)
signal.signal(signal.SIGTERM, self.exit_gracefully)

def exit_gracefully(self, signum, frame):
print('Exiting gracefully')
self.kill_now = True


if __name__ == "__main__":
print(f'DMR++ argv: {sys.argv}')
if len(sys.argv) <= 1:
killer = GracefulKiller()
print('DMR++ Task is running...')
while not killer.kill_now:
time.sleep(1)
print('terminating')
else:
print('DMR++ calling function')
print(f'argv: {type(sys.argv[1])}')
print(f'argv: {sys.argv[1]}')
main(json.loads(sys.argv[1]), {})
66 changes: 63 additions & 3 deletions dmrpp_generator/main.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
import json
import logging
import os
import re
import shutil
import time
from re import search
import subprocess
Expand Down Expand Up @@ -34,7 +37,7 @@ def __init__(self, **kwargs):
**config.get('collection', {}).get('meta', {}).get('dmrpp', {}), # from collection
}
self.processing_regex = self.dmrpp_meta.get(
'dmrpp_regex', '.*\\.(((?i:(h|hdf)))(e)?5|nc(4)?)(\\.bz2|\\.gz|\\.Z)?'
'dmrpp_regex', '.*\\.(((?i:(h|hdf)))(e)?5|nc(4)?)(\\.bz2|\\.gz|\\.Z)?$'
)

super().__init__(**kwargs)
Expand Down Expand Up @@ -99,6 +102,59 @@ def upload_file_to_s3(self, filename, uri):
return s3.upload(filename, uri, extra={})

def process(self):
if 'EBS_MNT' in os.environ:
print('Using DAAC Split Processing')
ret = self.process_dmrpp_ebs()
else:
print('Using Cumulus Processing')
ret = self.process_cumulus()

return ret

def process_dmrpp_ebs(self):
collection = self.config.get('collection')
local_store = os.getenv('EBS_MNT')
c_id = f'{collection.get("name")}__{collection.get("version")}'
collection_store = f'{local_store}/{c_id}'
event_file = f'{collection_store}/{c_id}.json'
with open(event_file, 'r') as output:
contents = json.load(output)
print(f'Granule Count: {len(contents.get("granules"))}')
granules = {'granules': contents.get('granules')}

for granule in granules.get('granules'):
dmrpp_files = []
for file in granule.get('files'):
filename = file.get('fileName')
if not re.search(self.processing_regex, filename):
continue
else:
print(f'regex {self.processing_regex} matched file {filename}: {re.search(self.processing_regex, filename).group()}')
src = file.get('key')
dst = f'{self.path}{filename}'
print(f'Copying: {src} -> {dst}')
shutil.copy(src, dst)
dmrpp_files = self.dmrpp_generate(dst, True, self.dmrpp_meta)

for dmrpp in dmrpp_files:
dest = f'{collection_store}/{os.path.basename(dmrpp)}'
print(f'Copying: {dmrpp} -> {dest}')
shutil.copy(dmrpp, dest)
os.remove(dmrpp)
granule.get('files').append({
'fileName': os.path.basename(dest),
'key': dest,
'size': os.path.getsize(dest)
})

shutil.move(event_file, f'{event_file}.dmrpp.in')
with open(event_file, 'w+') as file:
file.write(json.dumps(granules))

print('DMR++ processing completed.')
return {"granules": granules, "input": self.output}

def process_cumulus(self):
"""
Override the processing wrapper
:return:
Expand All @@ -107,18 +163,19 @@ def process(self):
collection_files = collection.get('files', [])
buckets = self.config.get('buckets')
granules = self.input['granules']

print(f'processing {len(granules)} files...')
output_generated = False
for granule in granules:
dmrpp_files = []
for file_ in granule['files']:
self.logger_to_cw.info(f'file: {file_}')
if not search(f"{self.processing_regex}$", file_['fileName']):
self.logger_to_cw.debug(f"{self.dmrpp_version}: regex {self.processing_regex}"
f" does not match filename {file_['fileName']}")
continue
self.logger_to_cw.debug(f"{self.dmrpp_version}: regex {self.processing_regex}"
f" matches filename to process {file_['fileName']}")
input_file_path = file_.get('filename', f's3://{file_["bucket"]}/{file_["key"]}')
input_file_path = f's3://{file_["bucket"]}/{file_["key"]}'
output_file_paths = self.dmrpp_generate(input_file=input_file_path, dmrpp_meta=self.dmrpp_meta)

if not output_generated and len(output_file_paths) > 0:
Expand All @@ -135,6 +192,8 @@ def process(self):
"type": self.get_file_type(output_file_basename, collection_files),
}
dmrpp_files.append(dmrpp_file)
upload_location = f's3://{dmrpp_file["bucket"]}/{dmrpp_file["key"]}'
self.logger_to_cw.info(f'upload_location: {upload_location}')
self.upload_file_to_s3(output_file_path, f's3://{dmrpp_file["bucket"]}/{dmrpp_file["key"]}')

if len(dmrpp_files) == 0:
Expand All @@ -147,6 +206,7 @@ def process(self):
raise Exception('No dmrpp files were produced and verify_output was enabled.')

return self.input


@staticmethod
def strip_old_dmrpp_files(granule):
Expand Down
2 changes: 1 addition & 1 deletion modules/dmrpp_service/main.tf
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ module "dmrpp_ecs_task_module" {
}

module "dmrpp_service" {
source = "https://github.com/nasa/cumulus/releases/download/v18.2.0/terraform-aws-cumulus-ecs-service.zip"
source = "https://github.com/nasa/cumulus/releases/download/v18.4.0/terraform-aws-cumulus-ecs-service.zip"

prefix = var.prefix
name = "dmrpp_generator"
Expand Down
Loading