diff --git a/src/fmu/sumo/config_jobs/SUMO_UPLOAD b/src/fmu/sumo/config_jobs/SUMO_UPLOAD deleted file mode 100644 index b45b9502..00000000 --- a/src/fmu/sumo/config_jobs/SUMO_UPLOAD +++ /dev/null @@ -1,23 +0,0 @@ --- This is the forward model job which uploads data to Sumo --- It is called from the ERT config file as a regular forward model - --- Arguments: --- SUMO_CASEPATH: The absolute path to the root of the case --- e.g. // --- SEARCHPATH: The searchpath relative to the realization root --- e.g "share/results/maps/*.gri" --- SUMO_ENV: The environment to upload to - -STDERR sumo_upload.stderr -STDOUT sumo_upload.stdout - -EXECUTABLE sumo_upload - - -ARGLIST - -MIN_ARG 2 -MAX_ARG 3 -ARG_TYPE 0 STRING -ARG_TYPE 1 STRING -ARG_TYPE 2 STRING diff --git a/src/fmu/sumo/hook_implementations/__init__.py b/src/fmu/sumo/hook_implementations/__init__.py deleted file mode 100644 index e69de29b..00000000 diff --git a/src/fmu/sumo/hook_implementations/jobs.py b/src/fmu/sumo/hook_implementations/jobs.py deleted file mode 100644 index 29afadf5..00000000 --- a/src/fmu/sumo/hook_implementations/jobs.py +++ /dev/null @@ -1,61 +0,0 @@ -import importlib -import os -from pkg_resources import resource_filename - -from ert.shared.plugins.plugin_manager import hook_implementation -from ert.shared.plugins.plugin_response import plugin_response - - -def _get_jobs_from_directory(directory): - """Do a filesystem lookup in a directory to check - for available ERT forward models""" - resource_directory = resource_filename("fmu", directory) - - all_files = [ - os.path.join(resource_directory, f) - for f in os.listdir(resource_directory) - if os.path.isfile(os.path.join(resource_directory, f)) - ] - return {os.path.basename(path): path for path in all_files} - - -# pylint: disable=no-value-for-parameter -@hook_implementation -@plugin_response(plugin_name="fmu_sumo") # pylint: disable=no-value-for-parameter -def installable_jobs(): - return _get_jobs_from_directory("sumo/config_jobs") - - -def _get_module_variable_if_exists(module_name, variable_name, default=""): - try: - script_module = importlib.import_module(module_name) - except ImportError: - return default - - return getattr(script_module, variable_name, default) - - -@hook_implementation -@plugin_response(plugin_name="fmu_sumo") # pylint: disable=no-value-for-parameter -def job_documentation(job_name): - sumo_fmu_jobs = set(installable_jobs().data.keys()) - if job_name not in sumo_fmu_jobs: - return None - - module_name = "jobs.scripts.{}".format(job_name.lower()) - - description = _get_module_variable_if_exists( - module_name=module_name, variable_name="description" - ) - examples = _get_module_variable_if_exists( - module_name=module_name, variable_name="examples" - ) - category = _get_module_variable_if_exists( - module_name=module_name, variable_name="category", default="other" - ) - - return { - "description": description, - "examples": examples, - "category": category, - } diff --git a/src/fmu/sumo/uploader/.gitignore b/src/fmu/sumo/uploader/.gitignore deleted file mode 100644 index e672f9da..00000000 --- a/src/fmu/sumo/uploader/.gitignore +++ /dev/null @@ -1,2 +0,0 @@ -*.py~ -__pycache__ diff --git a/src/fmu/sumo/uploader/__init__.py b/src/fmu/sumo/uploader/__init__.py deleted file mode 100644 index bcae7ed1..00000000 --- a/src/fmu/sumo/uploader/__init__.py +++ /dev/null @@ -1,18 +0,0 @@ -"""Top-level package for fmu.sumo.uploader""" - -try: - from .version import version - - __version__ = version -except ImportError: - __version__ = "0.0.0" - -from fmu.sumo.uploader.caseondisk import CaseOnDisk -from fmu.sumo.uploader.caseonjob import CaseOnJob -from fmu.sumo.uploader._connection import ( - SumoConnection, - SumoConnectionWithOutsideToken, -) - -# from fmu.sumo.uploader._fileondisk import FileOnDisk -# from fmu.sumo.uploader._fileonjob import FileOnJob diff --git a/src/fmu/sumo/uploader/_connection.py b/src/fmu/sumo/uploader/_connection.py deleted file mode 100644 index 24b6ad62..00000000 --- a/src/fmu/sumo/uploader/_connection.py +++ /dev/null @@ -1,68 +0,0 @@ -import logging - -from sumo.wrapper import SumoClient - - -class SumoConnection: - """Object to hold authentication towards Sumo""" - - def __init__(self, env=None, token=None): - self._api = None - self._env = env - self.token = token - - info = "Connection to Sumo on environment: {}".format(self.env) - logging.info(info) - - @property - def env(self): - if self._env is None: - self._env = "dev" - - return self._env - - @property - def api(self): - if self._api is None: - self._api = self._establish_connection() - - return self._api - - def refresh(self): - """Re-create the connection""" - self._api = self._establish_connection() - - def _establish_connection(self): - """Establish the connection with Sumo API, take user through 2FA.""" - return SumoClient(env=self.env, token=self.token) - - -class SumoConnectionWithOutsideToken: - """Object to hold authentication towards Sumo with outside access token""" - - def __init__(self, access_token, env=None): - self._api = None - self._env = env - self._access_token = access_token - - @property - def env(self): - if self._env is None: - self._env = "dev" - - return self._env - - @property - def api(self): - if self._api is None: - self._api = self._establish_connection() - - return self._api - - @property - def access_token(self): - return self._access_token - - def _establish_connection(self): - """Establish the connection with Sumo API with outside access token""" - return SumoClient(env=self.env, token=self._access_token) diff --git a/src/fmu/sumo/uploader/_fileondisk.py b/src/fmu/sumo/uploader/_fileondisk.py deleted file mode 100644 index 34f11148..00000000 --- a/src/fmu/sumo/uploader/_fileondisk.py +++ /dev/null @@ -1,350 +0,0 @@ -""" - - The FileOnDisk class objectifies a file as it appears - on the disk. A file in this context refers to a data/metadata - pair (technically two files). - -""" - -import os -import datetime -import time -import sys -import subprocess -import logging -import hashlib -import base64 -import yaml - -from sumo.wrapper._request_error import ( - AuthenticationError, - TransientError, - PermanentError, -) - -from azure.core.exceptions import ResourceExistsError - -# pylint: disable=C0103 # allow non-snake case variable names - -logger = logging.getLogger(__name__) -logger.setLevel(logging.CRITICAL) - - -def path_to_yaml_path(path): - """ - Given a path, return the corresponding yaml file path - according to FMU standards. - /my/path/file.txt --> /my/path/.file.txt.yaml - """ - - dir_name = os.path.dirname(path) - basename = os.path.basename(path) - - return os.path.join(dir_name, f".{basename}.yml") - - -def parse_yaml(path): - """From path, parse file as yaml, return data""" - with open(path, "r") as stream: - data = yaml.safe_load(stream) - - return data - - -def file_to_byte_string(path): - """ - Given an path to a file, read as bytes, return byte string. - """ - - with open(path, "rb") as f: - byte_string = f.read() - - return byte_string - - -def _datetime_now(): - """Return datetime now on FMU standard format""" - return datetime.datetime.now().isoformat() - - -def _get_segyimport_cmdstr(blob_url, object_id, file_path, sample_unit): - """Return the command string for running OpenVDS SEGYImport""" - try: - url = '"azureSAS:' + blob_url["baseuri"][6:] + '"' - url_conn = '"Suffix=?' + blob_url["auth"] + '"' - except: - url = '"azureSAS' + blob_url.split(object_id)[0][5:] + '"' # SEGYImport expects url to container - url_conn = '"Suffix=?' + blob_url.split("?")[1] + '"' - - persistent_id = '"' + object_id + '"' - - python_path = os.path.dirname(sys.executable) - path_to_SEGYImport = os.path.join(python_path, '..', 'bin', 'SEGYImport') - if sys.platform.startswith("win"): - path_to_SEGYImport = path_to_SEGYImport + ".exe" - if not os.path.isfile(path_to_SEGYImport): - path_to_SEGYImport = os.path.join(python_path, '..', 'shims', 'SEGYImport') - - cmdstr = ' '.join([path_to_SEGYImport, - '--compression-method', 'RLE', - '--brick-size', '64', - '--sample-unit', sample_unit, - '--url', url, - '--url-connection', url_conn, - '--persistentID', persistent_id, - file_path]) - return cmdstr - - -class FileOnDisk: - def __init__(self, path: str, metadata_path=None, verbosity="INFO"): - """ - path (str): Path to file - metadata_path (str): Path to metadata file. If not provided, - path will be derived from file path. - """ - - self.verbosity = verbosity - logger.setLevel(level=self.verbosity) - - self.metadata_path = metadata_path if metadata_path else path_to_yaml_path(path) - self.path = os.path.abspath(path) - self.metadata = parse_yaml(self.metadata_path) - - self._size = None - - self.basename = os.path.basename(self.path) - self.dir_name = os.path.dirname(self.path) - - self._file_format = None - - self.sumo_object_id = None - self.sumo_parent_id = None - - self.metadata["_sumo"] = {} - - if self.metadata["data"]["format"] in ["openvds", "segy"]: - self.metadata["_sumo"]["blob_size"] = 0 - self.byte_string = None - else: - self.byte_string = file_to_byte_string(path) - self.metadata["_sumo"]["blob_size"] = len(self.byte_string) - digester = hashlib.md5(self.byte_string) - self.metadata["_sumo"]["blob_md5"] = base64.b64encode( - digester.digest() - ).decode("utf-8") - - def __repr__(self): - if not self.metadata: - return f"\n# {self.__class__} \n# No metadata" - - s = f"\n# {self.__class__}" - s += f"\n# Disk path: {self.path}" - s += f"\n# Basename: {self.basename}" - if self.byte_string is not None: - s += f"\n# Byte string length: {len(self.byte_string)}" - - if not self.sumo_object_id is None: - s += f"\n# Uploaded to Sumo. Sumo_ID: {self.sumo_object_id}" - - return s - - @property - def size(self): - """Size of the file""" - if self._size is None: - self._size = os.path.getsize(self.path) - - return self._size - - def _upload_metadata(self, sumo_connection, sumo_parent_id): - path = f"/objects('{sumo_parent_id}')" - response = sumo_connection.api.post(path=path, json=self.metadata) - return response - - def _upload_byte_string(self, sumo_connection, object_id, blob_url): - response = sumo_connection.api.blob_client.upload_blob( - blob=self.byte_string, url=blob_url - ) - return response - - def _delete_metadata(self, sumo_connection, object_id): - path = f"/objects('{object_id}')" - response = sumo_connection.api.delete(path=path) - return response - - def upload_to_sumo(self, sumo_parent_id, sumo_connection): - """Upload this file to Sumo""" - - logger.debug("Starting upload_to_sumo()") - - if not sumo_parent_id: - raise ValueError( - f"Upload failed, missing sumo_parent_id. Got: {sumo_parent_id}" - ) - - _t0 = time.perf_counter() - _t0_metadata = time.perf_counter() - - result = {} - - backoff = [1, 3, 9] - - # UPLOAD METADATA - - for i in backoff: - logger.debug("backoff in outer loop is %s", str(i)) - - try: - - # We need these included even if returning before blob upload - result["blob_file_path"] = self.path - result["blob_file_size"] = self.size - - # Uploader converts segy-files to OpenVDS: - if self.metadata["data"]["format"] in ["openvds", "segy"]: - self.metadata["data"]["format"] = "openvds" - self.metadata["file"]["checksum_md5"] = "" - - response = self._upload_metadata( - sumo_connection=sumo_connection, sumo_parent_id=sumo_parent_id - ) - - _t1_metadata = time.perf_counter() - - result["metadata_upload_response_status_code"] = response.status_code - result["metadata_upload_response_text"] = response.text - result["metadata_upload_time_start"] = _t0_metadata - result["metadata_upload_time_end"] = _t1_metadata - result["metadata_upload_time_elapsed"] = _t1_metadata - _t0_metadata - result["metadata_file_path"] = self.metadata_path - result["metadata_file_size"] = self.size - - except TransientError as err: - logger.debug("TransientError on blob upload. Sleeping %s", str(i)) - result["status"] = "failed" - result["metadata_upload_response_status_code"] = err.code - result["metadata_upload_response_text"] = err.message - time.sleep(i) - continue - - except AuthenticationError as err: - result["status"] = "rejected" - result["metadata_upload_response_status_code"] = err.code - result["metadata_upload_response_text"] = err.message - return result - except PermanentError as err: - result["status"] = "rejected" - result["metadata_upload_response_status_code"] = err.code - result["metadata_upload_response_text"] = err.message - return result - - break - - if result["metadata_upload_response_status_code"] not in [200, 201]: - return result - - self.sumo_parent_id = sumo_parent_id - self.sumo_object_id = response.json().get("objectid") - - blob_url = response.json().get("blob_url") - - # UPLOAD BLOB - - - _t0_blob = time.perf_counter() - upload_response = {} - for i in backoff: - logger.debug("backoff in inner loop is %s", str(i)) - try: - if self.metadata["data"]["format"] in ["openvds", "segy"]: - if sys.platform.startswith('darwin'): - # OpenVDS does not support Mac/darwin directly - # Outer code expects and interprets http error codes - upload_response["status_code"] = 418 - upload_response["text"] = "Can not perform SEGY upload since OpenVDS does not support Mac" - else: - if self.metadata["data"]["vertical_domain"] == 'depth': - sample_unit = 'm' - else: - sample_unit = 'ms' # aka time domain - cmd_str = _get_segyimport_cmdstr(blob_url, self.sumo_object_id, self.path, sample_unit) - cmd_result = subprocess.run(cmd_str, - capture_output=True, text=True, shell=True) - if cmd_result.returncode == 0: - upload_response["status_code"] = 200 - upload_response["text"] = "SEGY uploaded as OpenVDS." - else: - # Outer code expects and interprets http error codes - upload_response["status_code"] = 418 - upload_response["text"] = "FAILED SEGY upload as OpenVDS. " + cmd_result.stderr - else: - response = self._upload_byte_string( - sumo_connection=sumo_connection, - object_id=self.sumo_object_id, - blob_url=blob_url, - ) - upload_response["status_code"] = response.status_code - upload_response["text"] = response.text - - _t1_blob = time.perf_counter() - - result["blob_upload_response_status_code"] = upload_response[ - "status_code" - ] - result["blob_upload_response_status_text"] = upload_response["text"] - result["blob_upload_time_start"] = _t0_blob - result["blob_upload_time_end"] = _t1_blob - result["blob_upload_time_elapsed"] = _t1_blob - _t0_blob - - except ResourceExistsError as err: - upload_response["status_code"] = 200 - upload_response["text"] = "File hopefully uploaded" - _t1_blob = time.perf_counter() - - result["blob_upload_response_status_code"] = upload_response[ - "status_code" - ] - result["blob_upload_response_status_text"] = upload_response["text"] - result["blob_upload_time_start"] = _t0_blob - result["blob_upload_time_end"] = _t1_blob - result["blob_upload_time_elapsed"] = _t1_blob - _t0_blob - - except OSError as err: - logger.debug("Upload failed: %s", str(err)) - result["status"] = "failed" - self._delete_metadata(sumo_connection, self.sumo_object_id) - return result - - except TransientError as err: - logger.debug("Got TransientError. Sleeping for %s seconds", str(i)) - result["status"] = "failed" - result["blob_upload_response_status_code"] = err.code - time.sleep(i) - continue - - except AuthenticationError as err: - logger.debug("Upload failed: %s", upload_response.get("text")) - result["status"] = "rejected" - self._delete_metadata(sumo_connection, self.sumo_object_id) - return result - - except PermanentError as err: - logger.debug("Upload failed: %s", upload_response["text"]) - result["status"] = "rejected" - self._delete_metadata(sumo_connection, self.sumo_object_id) - return result - - break - - if "status_code" not in upload_response or upload_response[ - "status_code" - ] not in [200, 201]: - logger.debug("Upload failed: %s", upload_response["text"]) - result["status"] = "failed" - self._delete_metadata(sumo_connection, self.sumo_object_id) - else: - result["status"] = "ok" - - return result - diff --git a/src/fmu/sumo/uploader/_fileonjob.py b/src/fmu/sumo/uploader/_fileonjob.py deleted file mode 100644 index 3f0699f3..00000000 --- a/src/fmu/sumo/uploader/_fileonjob.py +++ /dev/null @@ -1,208 +0,0 @@ -""" - - The FileOnDisk class objectifies a file as it appears - on the disk. A file in this context refers to a data/metadata - pair (technically two files). - -""" - -import os -import datetime -import time -import logging -import hashlib -import base64 -import tempfile -import json - -import yaml - -from sumo.wrapper._request_error import ( - AuthenticationError, - TransientError, - PermanentError, -) - -from azure.core.exceptions import ResourceExistsError - -# pylint: disable=C0103 # allow non-snake case variable names - -logger = logging.getLogger(__name__) -logger.setLevel(logging.CRITICAL) - -def parse_yaml(path): - """From path, parse file as yaml, return data""" - with open(path, "r") as stream: - data = yaml.safe_load(stream) - - return data - -class FileOnJob: - def __init__(self, byte_string: str, metadata): - """ - path (str): Path to file - metadata_path (str): Path to metadata file. If not provided, - path will be derived from file path. - """ - self.metadata = metadata - self._size = None - self._file_format = None - self.sumo_object_id = None - self.sumo_parent_id = None - - self.metadata["_sumo"] = {} - - self.byte_string = byte_string - self.metadata["_sumo"]["blob_size"] = len(self.byte_string) - digester = hashlib.md5(self.byte_string) - self.metadata["_sumo"]["blob_md5"] = base64.b64encode( - digester.digest() - ).decode("utf-8") - - # TODO hack - self.metadata["file"]["absolute_path"] = "" - self.metadata["file"]["checksum_md5"] = self.metadata["_sumo"]["blob_md5"] - - def _upload_metadata(self, sumo_connection, sumo_parent_id): - path = f"/objects('{sumo_parent_id}')" - response = sumo_connection.api.post(path=path, json=self.metadata) - return response - - def _upload_byte_string(self, sumo_connection, object_id, blob_url): - response = sumo_connection.api.blob_client.upload_blob(blob=self.byte_string, url=blob_url) - return response - - def _delete_metadata(self, sumo_connection, object_id): - path = f"/objects('{object_id}')" - response = sumo_connection.api.delete(path=path) - return response - - def upload_to_sumo(self, sumo_parent_id, sumo_connection): - """Upload this file to Sumo""" - - logger.debug("Starting upload_to_sumo()") - - if not sumo_parent_id: - raise ValueError( - f"Upload failed, sumo_parent_id passed to upload_to_sumo: {sumo_parent_id}" - ) - - _t0 = time.perf_counter() - _t0_metadata = time.perf_counter() - - result = {} - - backoff = [1, 3, 9] - - for i in backoff: - logger.debug("backoff in outer loop is %s", str(i)) - - try: - response = self._upload_metadata( - sumo_connection=sumo_connection, sumo_parent_id=sumo_parent_id - ) - - _t1_metadata = time.perf_counter() - - result["metadata_upload_response_status_code"] = response.status_code - result["metadata_upload_response_text"] = response.text - result["metadata_upload_time_start"] = _t0_metadata - result["metadata_upload_time_end"] = _t1_metadata - result["metadata_upload_time_elapsed"] = _t1_metadata - _t0_metadata - - except TransientError as err: - logger.debug("TransientError on blob upload. Sleeping %s", str(i)) - result["status"] = "failed" - result["metadata_upload_response_status_code"] = err.code - result["metadata_upload_response_text"] = err.message - time.sleep(i) - continue - - except AuthenticationError as err: - result["status"] = "rejected" - result["metadata_upload_response_status_code"] = err.code - result["metadata_upload_response_text"] = err.message - return result - except PermanentError as err: - result["status"] = "rejected" - result["metadata_upload_response_status_code"] = err.code - result["metadata_upload_response_text"] = err.message - return result - - break - - if result["metadata_upload_response_status_code"] not in [200, 201]: - return result - - self.sumo_parent_id = sumo_parent_id - self.sumo_object_id = response.json().get("objectid") - - blob_url = response.json().get("blob_url") - - # UPLOAD BLOB - - _t0_blob = time.perf_counter() - upload_response = {} - for i in backoff: - logger.debug("backoff in inner loop is %s", str(i)) - try: - response = self._upload_byte_string( - sumo_connection=sumo_connection, - object_id=self.sumo_object_id, - blob_url=blob_url, - ) - upload_response["status_code"] = response.status_code - upload_response["text"] = response.text - - _t1_blob = time.perf_counter() - - result["blob_upload_response_status_code"] = upload_response[ - "status_code" - ] - result["blob_upload_response_status_text"] = upload_response["text"] - result["blob_upload_time_start"] = _t0_blob - result["blob_upload_time_end"] = _t1_blob - result["blob_upload_time_elapsed"] = _t1_blob - _t0_blob - except ResourceExistsError as err: - upload_response["status_code"] = 200 - upload_response["text"] = "File hopefully uploaded to Oneseimic" - _t1_blob = time.perf_counter() - - result["blob_upload_response_status_code"] = upload_response[ - "status_code" - ] - result["blob_upload_response_status_text"] = upload_response["text"] - result["blob_upload_time_start"] = _t0_blob - result["blob_upload_time_end"] = _t1_blob - result["blob_upload_time_elapsed"] = _t1_blob - _t0_blob - - except OSError as err: - logger.debug("Upload failed: %s", str(err)) - result["status"] = "failed" - self._delete_metadata(sumo_connection, self.sumo_object_id) - return result - except TransientError as err: - logger.debug("Got TransientError. Sleeping for %i seconds", str(i)) - result["status"] = "failed" - time.sleep(i) - continue - except AuthenticationError as err: - logger.debug("Upload failed: %s", upload_response["text"]) - result["status"] = "rejected" - self._delete_metadata(sumo_connection, self.sumo_object_id) - return result - except PermanentError as err: - logger.debug("Upload failed: %s", upload_response["text"]) - result["status"] = "rejected" - self._delete_metadata(sumo_connection, self.sumo_object_id) - return result - - break - - if upload_response["status_code"] not in [200, 201]: - logger.debug("Upload failed: %s", upload_response["text"]) - result["status"] = "failed" - self._delete_metadata(sumo_connection, self.sumo_object_id) - else: - result["status"] = "ok" - return result diff --git a/src/fmu/sumo/uploader/_upload_files.py b/src/fmu/sumo/uploader/_upload_files.py deleted file mode 100644 index 36229546..00000000 --- a/src/fmu/sumo/uploader/_upload_files.py +++ /dev/null @@ -1,79 +0,0 @@ -""" - - The function that uploads files. - -""" - -from concurrent.futures import ThreadPoolExecutor - -# pylint: disable=C0103 # allow non-snake case variable names - - -def _upload_files(files, sumo_connection, sumo_parent_id, threads=4): - """ - Create threads and call _upload in each thread - """ - - with ThreadPoolExecutor(threads) as executor: - results = executor.map( - _upload_file, [(file, sumo_connection, sumo_parent_id) for file in files] - ) - - return results - - -def _upload_file(args): - """Upload a file""" - - file, sumo_connection, sumo_parent_id = args - - result = file.upload_to_sumo( - sumo_connection=sumo_connection, sumo_parent_id=sumo_parent_id - ) - - result["file"] = file - - return result - - -def upload_files(files: list, sumo_parent_id: str, sumo_connection, threads=4): - """ - Upload files - - files: list of FileOnDisk objects - sumo_parent_id: sumo_parent_id for the parent case - - Upload is kept outside classes to use multithreading. - """ - - results = _upload_files( - files=files, - sumo_connection=sumo_connection, - sumo_parent_id=sumo_parent_id, - threads=threads, - ) - - ok_uploads = [] - failed_uploads = [] - rejected_uploads = [] - - for r in results: - status = r.get("status") - - if not status: - raise ValueError('File upload result returned with no "status" attribute') - - if status == "ok": - ok_uploads.append(r) - - elif status == "rejected": - rejected_uploads.append(r) - - else: - failed_uploads.append(r) - - return { - "ok_uploads": ok_uploads, - "failed_uploads": failed_uploads, - "rejected_uploads": rejected_uploads, - } diff --git a/src/fmu/sumo/uploader/caseondisk.py b/src/fmu/sumo/uploader/caseondisk.py deleted file mode 100644 index 96121ecb..00000000 --- a/src/fmu/sumo/uploader/caseondisk.py +++ /dev/null @@ -1,539 +0,0 @@ -"""Objectify an FMU case (results) as it appears on the disk.""" - -import os -from pathlib import Path -import uuid -import glob -import time -import logging -import warnings -import datetime - -import yaml -import json -import pandas as pd -import hashlib -import base64 - -from fmu.sumo.uploader._fileondisk import FileOnDisk -from fmu.sumo.uploader._upload_files import upload_files -from fmu.dataio import ExportData -from fmu.dataio._utils import read_parameters_txt - - -logger = logging.getLogger(__name__) -logger.setLevel(logging.CRITICAL) - -# pylint: disable=C0103 # allow non-snake case variable names - - -class CaseOnDisk: - """ - Class to hold information about an ERT run on disk. - - The CaseOnDisk object is a representation of files belonging to an FMU case, - as they are stored on the Scratch disk. - - A Case in this context is a set of metadata describing this particular case, - and an arbitrary number of files belonging to this case. Each file is in reality - a file pair, consisting of a data file (could be any file type) and a metadata file - (yaml formatted, according) to FMU standards. - - Example for initialization: - >>> from fmu import sumo - - >>> env = 'dev' - >>> case_metadata_path = 'path/to/case_metadata.yaml' - >>> search_path = 'path/to/search_path/' - - >>> sumo_connection = sumo.SumoConnection(env=env) - >>> case = sumo.CaseOnDisk( - case_metadata_path=case_metadata_path, - sumo_connection=sumo_connection) - - After initialization, files must be explicitly indexed into the CaseOnDisk object: - - >>> case.add_files(search_path) - - When initialized, the case can be uploaded to Sumo: - - >>> case.upload() - - Args: - case_metadata_path (str): Path to the case_metadata file for the case - sumo_connection (fmu.sumo.SumoConnection): SumoConnection object - - - """ - - def __init__( - self, case_metadata_path: str, sumo_connection, verbosity="INFO" - ): - """Initialize CaseOnDisk. - - Args: - case_metadata_path (str): Path to case_metadata for case - sumo_connection (fmu.sumo.SumoConnection): Connection to Sumo. - verbosity (str): Python logging level. - """ - - self.verbosity = verbosity - logger.setLevel(level=verbosity) - - self.sumo_connection = sumo_connection - - logger.debug("case metadata path: %s", case_metadata_path) - self._case_metadata_path = Path(case_metadata_path) - self.case_metadata = _load_case_metadata(case_metadata_path) - self._fmu_case_uuid = self._get_fmu_case_uuid() - logger.debug("self._fmu_case_uuid is %s", self._fmu_case_uuid) - self._sumo_parent_id = self._get_sumo_parent_id() - logger.debug("self._sumo_parent_id is %s", self._sumo_parent_id) - self._files = [] - - self._sumo_logger = sumo_connection.api.getLogger("log_2_server_caseondisk") - self._sumo_logger.setLevel(logging.INFO) - # Avoid that logging to sumo-server also is visible in local logging: - self._sumo_logger.propagate = False - self._sumo_logger.info("Upload init for sumo_parent_id: " - + str(self._sumo_parent_id)) - - def __str__(self): - s = f"{self.__class__}, {len(self._files)} files." - - if self._sumo_parent_id is not None: - s += f"\nInitialized on Sumo. Sumo_ID: {self._sumo_parent_id}" - else: - s += "\nNot initialized on Sumo." - - return s - - def __repr__(self): - return str(self.__str__) - - @property - def sumo_parent_id(self): - """Return the sumo parent ID""" - return self._sumo_parent_id - - @property - def fmu_case_uuid(self): - """Return the fmu_case_uuid""" - return self._fmu_case_uuid - - @property - def files(self): - """Return the files""" - return self._files - - def add_files(self, search_string): - """Add files to the case, based on search string""" - - logger.info("Searching for files at %s", search_string) - file_paths = _find_file_paths(search_string) - - for file_path in file_paths: - try: - file = FileOnDisk(path=file_path, verbosity=self.verbosity) - self._files.append(file) - logger.info("File appended: %s", file_path) - - except IOError as err: - info = f"{err}. No metadata, skipping file." - warnings.warn(info) - - def _get_sumo_parent_id(self): - """Get the sumo parent ID. - - If parent id is cached on disk, use that. Else call sumo to get it based on - fmu_case_uuid.""" - - # If a relatively new cached file exists we use that and avoid calling Sumo - cached_key = "sumo-case-id" - cached_file = Path( - self._case_metadata_path.parent / "sumo_parent_id.yml" - ) - if cached_file.exists(): - file_age = ( - datetime.datetime.today() - - datetime.datetime.fromtimestamp(cached_file.lstat().st_mtime) - ) - if file_age.days < 1: - logger.debug( - "cached sumo_parent_id is less than 1 days, using it." - ) - with open(str(cached_file), "r") as infile: - filecontents = yaml.safe_load(infile) - sumo_parent_id = filecontents.get(cached_key) - logger.debug( - "Got sumo_parent_id from cache: %s", sumo_parent_id - ) - try: - test_uuid = uuid.UUID(sumo_parent_id) - logger.debug("Getting sumo parent id from cached file") - return sumo_parent_id - except ValueError: - pass # Not a valid uuid, will call Sumo - - # No valid cached file, need to call Sumo to get the parent id - query = f"class:case AND fmu.case.uuid:{self.fmu_case_uuid}" - search_results = self.sumo_connection.api.get( - "/search", query=query, size=2, **{"from": 0} - ) - - # To catch crazy rare situation when index is empty (first upload to new index) - if not search_results.get("hits"): - return None - - hits = search_results.get("hits").get("hits") - - if len(hits) == 0: - return None - - if len(hits) == 1: - sumo_parent_id = hits[0].get("_id") - - try: - # Cache the parent id in a file - my_dict = {cached_key: sumo_parent_id} - with open(str(cached_file), "w") as outfile: - yaml.dump(my_dict, outfile) - logger.debug("Caching sumo parent id") - except: - # Might be concurrency issues, just skip caching to file this time - pass - - return sumo_parent_id - - raise ValueError( - f"More than one hit for fmu.case.uuid {self.fmu_case_uuid} found on Sumo" - ) - - def register(self): - """Register this case on Sumo. - - Assumptions: If registering an already existing case, it will be overwritten. - ("register" might be a bad word for this...) - - Returns: - sumo_parent_id (uuid4): Unique ID for this case on Sumo - """ - - logger.info("Registering case on Sumo") - - sumo_parent_id = self._upload_case_metadata(self.case_metadata) - self._sumo_parent_id = sumo_parent_id - - logger.info("Case registered. SumoID: {}".format(sumo_parent_id)) - - return sumo_parent_id - - def _upload_case_metadata(self, case_metadata: dict): - """Upload case metadata to Sumo.""" - - response = self.sumo_connection.api.post( - path="/objects", json=case_metadata - ) - - returned_object_id = response.json().get("objectid") - - return returned_object_id - - def _get_fmu_case_uuid(self): - """Return case_id from case_metadata.""" - - fmu_case_uuid = self.case_metadata.get("fmu").get("case").get("uuid") - - if not fmu_case_uuid: - raise ValueError("Could not get fmu_case_uuid from case metadata") - - return fmu_case_uuid - - def upload_parameters_txt( - self, - glob_var_path: str = "./fmuconfig/output/global_variables.yml", - parameters_path: str = "./parameters.txt", - ): - """Upload parameters.txt if it is not present in Sumo for the current realization""" - logger.info("Uploading parameters.txt") - - fmu_id = self.fmu_case_uuid - realization_id = self.files[0].metadata["fmu"]["realization"]["uuid"] - query = f"fmu.case.uuid:{fmu_id} AND fmu.realization.uuid:{realization_id} AND data.content:parameters" - search_res = self.sumo_connection.api.get("/search", query=query) - - if search_res["hits"]["total"]["value"] == 0: - with open(glob_var_path, "r") as variables_yml: - global_config = yaml.safe_load(variables_yml) - - parameters = read_parameters_txt(parameters_path) - - exd = ExportData( - config=global_config, content="parameters", name="parameters" - ) - metadata = exd.generate_metadata(parameters) - - bytes = json.dumps(parameters).encode("utf-8") - digester = hashlib.md5(bytes) - md5 = base64.b64encode(digester.digest()).decode("utf-8") - metadata["_sumo"] = {"blob_size": len(bytes), "blob_md5": md5} - - upload_res = self.sumo_connection.api.post( - f"/objects('{fmu_id}')", json=metadata - ) - self.sumo_connection.api.blob_client.upload_blob( - blob=bytes, url=upload_res.json()["blob_url"] - ) - else: - logger.info("Parameters.txt already exists") - - def upload(self, threads=4, max_attempts=1, register_case=False): - """Trigger upload of files. - - Get sumo_parent_id. If None, case is not registered on Sumo. - - Upload all indexed files. Collect the files that have been uploaded OK, the - ones that have failed and the ones that have been rejected. - - Retry the failed uploads X times.""" - - if self.sumo_parent_id is None: - logger.info("Case is not registered on Sumo") - - if register_case: - self.register() - logger.info( - "Waiting 1 minute for Sumo to create the case container" - ) - time.sleep(20) # Wait for Sumo to create the container - else: - # We catch the situation where case is not registered on Sumo but - # an upload is attempted anyway. In the FMU context, this can happen - # if something goes wrong with the initial case metadata creation and - # upload. If, for some reason, this fails and the case is never uploaded - # to Sumo, we (currently) want this script to not fail (and stop the - # workflow). Outside FMU context, this can be different and we retain - # the possibility for allowing this script to register the case. - - logger.info( - "Case was not found on Sumo. If you are in the FMU context " - "something may have gone wrong with the case registration " - "or you have not specified that the case shall be uploaded." - "A warning will be issued, and the script will stop. " - "If you are NOT in the FMU context, you can specify that " - "this script also registers the case by passing " - "register=True. This should not be done in the FMU context." - ) - warnings.warn( - "Case is not registered on Sumo.", - UserWarning, - ) - return - - if not self.files: - raise FileExistsError("No files to upload. Check search string.") - - ok_uploads = [] - failed_uploads = [] - rejected_uploads = [] - files_to_upload = [f for f in self.files] - - attempts = 0 - _t0 = time.perf_counter() - - logger.debug("files_to_upload: %s", files_to_upload) - - while files_to_upload and attempts < max_attempts: - upload_results = upload_files( - files=files_to_upload, - sumo_parent_id=self.sumo_parent_id, - sumo_connection=self.sumo_connection, - threads=threads, - ) - - ok_uploads += upload_results.get("ok_uploads") - rejected_uploads += upload_results.get("rejected_uploads") - failed_uploads = upload_results.get("failed_uploads") - - if not failed_uploads: - break - - files_to_upload = [f.get("file") for f in failed_uploads] - - attempts += 1 - - time.sleep(3) - logger.debug( - "Retrying {} failed uploads after waiting 3 seconds".format( - len(failed_uploads) - ) - ) - - if failed_uploads: - warnings.warn("Stopping after {} attempts".format(attempts)) - - _dt = time.perf_counter() - _t0 - - upload_statistics = "" - if len(ok_uploads) > 0: - upload_statistics = _calculate_upload_stats(ok_uploads) - logger.info(upload_statistics) - - if rejected_uploads: - logger.info( - f"\n\n{len(rejected_uploads)} files rejected by Sumo. First 5 rejected files:" - ) - - for u in rejected_uploads[0:4]: - logger.info("\n" + "=" * 50) - - logger.info(f"Filepath: {u.get('blob_file_path')}") - logger.info( - f"Metadata: [{u.get('metadata_upload_response_status_code')}] " - f"{u.get('metadata_upload_response_text')}" - ) - logger.info( - f"Blob: [{u.get('blob_upload_response_status_code')}] " - f"{u.get('blob_upload_response_status_text')}" - ) - self._sumo_logger.info(_get_log_msg(self.sumo_parent_id, u)) - - if failed_uploads: - logger.info( - f"\n\n{len(failed_uploads)} files failed by Sumo. First 5 failed files:" - ) - - for u in failed_uploads[0:4]: - logger.info("\n" + "=" * 50) - - logger.info(f"Filepath: {u.get('blob_file_path')}") - logger.info( - f"Metadata: [{u.get('metadata_upload_response_status_code')}] " - f"{u.get('metadata_upload_response_text')}" - ) - logger.info( - f"Blob: [{u.get('blob_upload_response_status_code')}] " - f"{u.get('blob_upload_response_status_text')}" - ) - self._sumo_logger.info(_get_log_msg(self.sumo_parent_id, u)) - - logger.info("Summary:") - logger.info("Total files count: %s", str(len(self.files))) - logger.info("OK: %s", str(len(ok_uploads))) - logger.info("Failed: %s", str(len(failed_uploads))) - logger.info("Rejected: %s", str(len(rejected_uploads))) - logger.info("Wall time: %s sec", str(_dt)) - - summary = { - "upload_summary": { - "parent_id": self.sumo_parent_id, - "total_files_count": str(len(self.files)), - "ok_files": str(len(ok_uploads)), - "failed_files": str(len(failed_uploads)), - "rejected_files": str(len(rejected_uploads)), - "wall_time_seconds": str(_dt), - "upload_statistics": upload_statistics - } - } - self._sumo_logger.info(str(summary)) - - return ok_uploads - -def _get_log_msg(sumo_parent_id, status): - """Return a suitable logging for upload issues.""" - - json = { - "upload_issue": { - "case_uuid": str(sumo_parent_id), - "filepath": str(status.get('blob_file_path')), - "metadata": { - "status_code": str(status.get('metadata_upload_response_status_code')), - "response_text": status.get('metadata_upload_response_text') - }, - "blob": { - "status_code": str(status.get('blob_upload_response_status_code')), - "response_text": ((status.get('blob_upload_response_status_text'))) - } - } - } - return json - - -def _sanitize_datetimes(data): - """Sanitize datetimes. - - Given a dictionary, find and replace all datetime objects - with isoformat string, so that it does not cause problems for - JSON later on.""" - - if isinstance(data, datetime.datetime): - return data.isoformat() - - if isinstance(data, dict): - for key in data.keys(): - data[key] = _sanitize_datetimes(data[key]) - - elif isinstance(data, list): - data = [_sanitize_datetimes(element) for element in data] - - return data - - -def _load_case_metadata(case_metadata_path: str): - """Load the case metadata.""" - - if not os.path.isfile(case_metadata_path): - raise IOError(f"case metadata not found: {case_metadata_path}") - - with open(case_metadata_path, "r") as stream: - yaml_data = yaml.safe_load(stream) - - logger.debug("Sanitizing datetimes from loaded case metadata") - yaml_data = _sanitize_datetimes(yaml_data) - - return yaml_data - - -def _find_file_paths(search_string): - """Find files and return as list of FileOnDisk instances.""" - - files = [f for f in glob.glob(search_string) if os.path.isfile(f)] - - if len(files) == 0: - info = "No files found! Please, check the search string." - warnings.warn(info) - - info = f"Search string: {search_string}" - warnings.warn(info) - - return files - - -def _calculate_upload_stats(uploads): - """Calculate upload statistics. - - Given a list of results from file upload, calculate and return - timing statistics for uploads.""" - - df = pd.DataFrame().from_dict(uploads) - - stats = { - "blob": { - "upload_time": { - "mean": df["blob_upload_time_elapsed"].mean(), - "max": df["blob_upload_time_elapsed"].max(), - "min": df["blob_upload_time_elapsed"].min(), - "std": df["blob_upload_time_elapsed"].std(), - }, - }, - "metadata": { - "upload_time": { - "mean": df["metadata_upload_time_elapsed"].mean(), - "max": df["metadata_upload_time_elapsed"].max(), - "min": df["metadata_upload_time_elapsed"].min(), - "std": df["metadata_upload_time_elapsed"].std(), - }, - }, - } - - return stats \ No newline at end of file diff --git a/src/fmu/sumo/uploader/caseonjob.py b/src/fmu/sumo/uploader/caseonjob.py deleted file mode 100644 index 501b13e9..00000000 --- a/src/fmu/sumo/uploader/caseonjob.py +++ /dev/null @@ -1,241 +0,0 @@ -"""Objectify an FMU case (results), for usage on Radix jobs.""" - -import os -import glob -import time -import logging -import warnings -import datetime - -import yaml -import pandas as pd - -from fmu.sumo.uploader._fileonjob import FileOnJob -from fmu.sumo.uploader._upload_files import upload_files - -logger = logging.getLogger(__name__) -logger.setLevel(logging.CRITICAL) - -# pylint: disable=C0103 # allow non-snake case variable names - - -class CaseOnJob: - """Initialize the CaseOnJob object.""" - - def __init__(self, case_metadata: str, sumo_connection, verbosity="DEBUG"): - logger.setLevel(level=verbosity) - - self.sumo_connection = sumo_connection - self.case_metadata = case_metadata - self._fmu_case_uuid = self._get_fmu_case_uuid() - logger.debug("self._fmu_case_uuid is %s", self._fmu_case_uuid) - self._sumo_parent_id = self._get_sumo_parent_id() - logger.debug("self._sumo_parent_id is %s", self._sumo_parent_id) - self._files = [] - - @property - def sumo_parent_id(self): - return self._sumo_parent_id - - @property - def fmu_case_uuid(self): - return self._fmu_case_uuid - - @property - def files(self): - return self._files - - def add_files(self, byte_string, metadata): - try: - file = FileOnJob(byte_string=byte_string, metadata=metadata) - self._files.append(file) - except IOError as err: - info = f"{err}. No metadata, skipping file." - warnings.warn(info) - - def _get_sumo_parent_id(self): - """Get the sumo parent ID. - - Call sumo, check if the case is already there. Use fmu_case_uuid for this.""" - - query = f"fmu.case.uuid:{self.fmu_case_uuid}" - search_results = self.sumo_connection.api.get( - "/searchroot", query=query, size=2, **{"from": 0} - ) - - # To catch crazy rare situation when index is empty (first upload to new index) - if not search_results.get("hits"): - return None - - hits = search_results.get("hits").get("hits") - - if len(hits) == 0: - return None - - if len(hits) == 1: - sumo_parent_id = hits[0].get("_id") - return sumo_parent_id - - raise ValueError( - f"More than one hit for fmu.case.uuid {self.fmu_case_uuid} found on Sumo" - ) - - def _get_fmu_case_uuid(self): - """Return case_id from case_metadata.""" - - fmu_case_uuid = self.case_metadata.get("fmu").get("case").get("uuid") - - if not fmu_case_uuid: - raise ValueError("Could not get fmu_case_uuid from case metadata") - - return fmu_case_uuid - - def upload(self, threads=4, max_attempts=1): - """Trigger upload of files. - - Get sumo_parent_id. If None, case is not registered on Sumo. - - Upload all indexed files. Collect the files that have been uploaded OK, the - ones that have failed and the ones that have been rejected. - - Retry the failed uploads X times.""" - - if self.sumo_parent_id is None: - logger.info("Case is not registered on Sumo") - - if not self.files: - raise FileExistsError("No files to upload. Check search string.") - - ok_uploads = [] - failed_uploads = [] - rejected_uploads = [] - files_to_upload = [f for f in self.files] - - attempts = 0 - _t0 = time.perf_counter() - - while files_to_upload and attempts < max_attempts: - upload_results = upload_files( - files=files_to_upload, - sumo_parent_id=self.sumo_parent_id, - sumo_connection=self.sumo_connection, - threads=threads, - ) - - ok_uploads += upload_results.get("ok_uploads") - rejected_uploads += upload_results.get("rejected_uploads") - failed_uploads = upload_results.get("failed_uploads") - - if not failed_uploads: - break - - files_to_upload = [f.get("file") for f in failed_uploads] - - attempts += 1 - - time.sleep(3) - logger.debug( - "Retrying {} failed uploads after waiting 3 seconds".format( - len(failed_uploads) - ) - ) - - if failed_uploads: - warnings.warn("Stopping after {} attempts".format(attempts)) - - _dt = time.perf_counter() - _t0 - - if len(ok_uploads) > 0: - upload_statistics = _calculate_upload_stats(ok_uploads) - logger.info(upload_statistics) - - if failed_uploads: - logger.info(f"{len(failed_uploads)} files failed to be uploaded") - - for u in failed_uploads[0:4]: - logger.info("\n" + "=" * 50) - - logger.info(f"Filepath: {u.get('blob_file_path')}") - logger.info( - f"Metadata: [{u.get('metadata_upload_response_status_code')}] " - f"{u.get('metadata_upload_response_text')}" - ) - logger.info( - f"Blob: [{u.get('blob_upload_response_status_code')}] " - f"{u.get('blob_upload_response_status_text')}" - ) - - if rejected_uploads: - logger.info( - f"\n\n{len(rejected_uploads)} files rejected by Sumo. First 5 rejected files:" - ) - - for u in rejected_uploads[0:4]: - logger.info("\n" + "=" * 50) - - logger.info(f"Filepath: {u.get('blob_file_path')}") - logger.info( - f"Metadata: [{u.get('metadata_upload_response_status_code')}] " - f"{u.get('metadata_upload_response_text')}" - ) - logger.info( - f"Blob: [{u.get('blob_upload_response_status_code')}] " - f"{u.get('blob_upload_response_status_text')}" - ) - - if failed_uploads: - logger.info( - f"\n\n{len(failed_uploads)} files rejected by Sumo. First 5 rejected files:" - ) - - for u in failed_uploads[0:4]: - logger.info("\n" + "=" * 50) - - logger.info(f"Filepath: {u.get('blob_file_path')}") - logger.info( - f"Metadata: [{u.get('metadata_upload_response_status_code')}] " - f"{u.get('metadata_upload_response_text')}" - ) - logger.info( - f"Blob: [{u.get('blob_upload_response_status_code')}] " - f"{u.get('blob_upload_response_status_text')}" - ) - - logger.info("Summary:") - logger.info("Total files count: %s", str(len(self.files))) - logger.info("OK: %s", str(len(ok_uploads))) - logger.info("Failed: %s", str(len(failed_uploads))) - logger.info("Rejected: %s", str(len(rejected_uploads))) - logger.info("Wall time: %s sec", str(_dt)) - - return ok_uploads - - -def _calculate_upload_stats(uploads): - """Calculate upload statistics. - - Given a list of results from file upload, calculate and return - timing statistics for uploads.""" - - df = pd.DataFrame().from_dict(uploads) - - stats = { - "blob": { - "upload_time": { - "mean": df["blob_upload_time_elapsed"].mean(), - "max": df["blob_upload_time_elapsed"].max(), - "min": df["blob_upload_time_elapsed"].min(), - "std": df["blob_upload_time_elapsed"].std(), - }, - }, - "metadata": { - "upload_time": { - "mean": df["metadata_upload_time_elapsed"].mean(), - "max": df["metadata_upload_time_elapsed"].max(), - "min": df["metadata_upload_time_elapsed"].min(), - "std": df["metadata_upload_time_elapsed"].std(), - }, - }, - } - - return stats diff --git a/src/fmu/sumo/uploader/scripts/__init__.py b/src/fmu/sumo/uploader/scripts/__init__.py deleted file mode 100644 index 139597f9..00000000 --- a/src/fmu/sumo/uploader/scripts/__init__.py +++ /dev/null @@ -1,2 +0,0 @@ - - diff --git a/src/fmu/sumo/uploader/scripts/sumo_upload.py b/src/fmu/sumo/uploader/scripts/sumo_upload.py deleted file mode 100644 index 9d010424..00000000 --- a/src/fmu/sumo/uploader/scripts/sumo_upload.py +++ /dev/null @@ -1,217 +0,0 @@ -#!/usr/bin/env python - -"""Upload data to Sumo from FMU.""" - -import warnings -import os -import argparse -import logging -from pathlib import Path - -try: - from ert.shared.plugins.plugin_manager import hook_implementation # type: ignore -except ModuleNotFoundError: - from ert_shared.plugins.plugin_manager import hook_implementation # type: ignore - -try: - from ert.shared.plugins.plugin_response import plugin_response # type: ignore -except ModuleNotFoundError: - from ert_shared.plugins.plugin_response import plugin_response # type: ignore - -try: - from ert import ErtScript # type: ignore -except ModuleNotFoundError: - from res.job_queue import ErtScript # type: ignore - -from fmu.sumo import uploader - -logger = logging.getLogger(__name__) -logger.setLevel(logging.CRITICAL) - -# This documentation is for sumo_uploader as an ERT workflow -DESCRIPTION = """SUMO_UPLOAD will upload files to Sumo. The typical use case is as add-on to -post-processing workflows which aggregate data across an ensemble and stores the -results outside the realization folders. - -SUMO_UPLOAD is implemented both as FORWARD_JOB and WORKFLOW_JOB and can be called from -both contexts when running ERT.""" - -EXAMPLES = """In an existing workflow e.g. ert/bin/workflows/MY_WORKFLOW with the contents - -MY_JOB -SUMO_UPLOAD /MyIteration/share/results/tables/*.csv - -where ``MY_JOB`` typically refers to a post-processing job creating data -and where typically refers to // - - is typically set in the config as it is used also by forward jobs. -It must refer to a valid Sumo environment. Normally this should be set to prod.""" - - -def main() -> None: - """Entry point from command line (e.g. ERT FORWARD_JOB).""" - - parser = _get_parser() - args = parser.parse_args() - - if args.verbose: - logger.setLevel(logging.INFO) - if args.debug: - logger.setLevel(logging.DEBUG) - - # Legacy? Still needed? - args.casepath = os.path.expandvars(args.casepath) - args.searchpath = os.path.expandvars(args.searchpath) - - _check_arguments(args) - - sumo_upload_main( - casepath=args.casepath, - searchpath=args.searchpath, - env=args.env, - metadata_path=args.metadata_path, - threads=args.threads, - ) - - -def sumo_upload_main( - casepath: str, searchpath: str, env: str, metadata_path: str, threads: int -) -> None: - """A "main" function that can be used both from command line and from ERT workflow""" - - logger.debug("Running fmu_uploader_main()") - - # Catch-all to ensure FMU workflow keeps running even if something happens. - # This should be a temporary solution to be re-evaluated in the future. - - try: - # establish the connection to Sumo - sumo_connection = uploader.SumoConnection(env=env) - logger.info("Connection to Sumo established") - - # initiate the case on disk object - logger.info("Case-relative metadata path is %s", metadata_path) - case_metadata_path = Path(casepath) / Path(metadata_path) - logger.info("case_metadata_path is %s", case_metadata_path) - - e = uploader.CaseOnDisk( - case_metadata_path=case_metadata_path, sumo_connection=sumo_connection - ) - # add files to the case on disk object - logger.info("Adding files. Search path is %s", searchpath) - e.add_files(searchpath) - logger.info("%s files has been added", str(len(e.files))) - - if len(e.files) == 0: - logger.debug("There are 0 (zero) files.") - warnings.warn("No files found - aborting ") - return - - # upload the indexed files - logger.info("Starting upload") - e.upload(threads=threads, register_case=False) - e.upload_parameters_txt() - logger.info("Upload done") - except Exception as err: - logger.info( - "Problem related to Sumo upload: " f"{err}" - ) - warnings.warn( - "Problem related to Sumo upload: " f"{err}") - _sumo_logger = sumo_connection.api.getLogger("log_2_server_sumo_upload") - _sumo_logger.propagate = False - _sumo_logger.warning("Problem related to Sumo upload for case: %s; %s", case_metadata_path, err) - return - - -class SumoUpload(ErtScript): - """A class with a run() function that can be registered as an ERT plugin. - - This is used for the ERT workflow context.""" - - # pylint: disable=too-few-public-methods - def run(self, *args): - # pylint: disable=no-self-use - """Parse with a simplified command line parser, for ERT only, - call sumo_upload_main()""" - logger.debug("Calling run() on SumoUpload") - parser = _get_parser() - args = parser.parse_args(args) - _check_arguments(args) - sumo_upload_main( - casepath=args.casepath, - searchpath=args.searchpath, - env=args.env, - metadata_path=args.metadata_path, - threads=args.threads, - ) - - -def _get_parser() -> argparse.ArgumentParser: - """Construct parser object for sumo_upload.""" - - parser = argparse.ArgumentParser() - parser.add_argument("casepath", type=str, help="Absolute path to case root") - parser.add_argument( - "searchpath", type=str, help="Absolute search path for files to upload" - ) - parser.add_argument("env", type=str, help="Sumo environment to use.") - parser.add_argument( - "--threads", type=int, help="Set number of threads to use.", default=2 - ) - parser.add_argument( - "--metadata_path", - type=str, - help="Case-relative path to case metadata", - default="share/metadata/fmu_case.yml", - ) - parser.add_argument("-v", "--verbose", action="store_true", help="Verbose output") - parser.add_argument( - "--debug", action="store_true", help="Debug output, more verbose than --verbose" - ) - - return parser - - -def _check_arguments(args) -> None: - """Do sanity check of the input arguments.""" - - logger.debug("Running check_arguments()") - logger.debug("Arguments are: %s", str(vars(args))) - - if args.env not in ["preview", "dev", "test", "prod", "localhost"]: - warnings.warn(f"Non-standard environment: {args.env}") - - if not Path(args.casepath).is_absolute(): - if args.casepath.startswith("<") and args.casepath.endswith(">"): - ValueError("ERT variable is not defined: %s", args.casepath) - raise ValueError("Provided casepath must be an absolute path to the case root") - - if not Path(args.casepath).exists(): - raise ValueError("Provided case path does not exist") - - logger.debug("check_arguments() has ended") - - -@hook_implementation -def legacy_ertscript_workflow(config): - """Hook the SumoUpload class into ERT with the name SUMO_UPLOAD, - and inject documentation""" - workflow = config.add_workflow(SumoUpload, "SUMO_UPLOAD") - workflow.parser = _get_parser - workflow.description = DESCRIPTION - workflow.examples = EXAMPLES - workflow.category = "export" - - -@hook_implementation -@plugin_response(plugin_name="SUMO_UPLOAD") -def job_documentation(job_name): - if job_name != "SUMO_UPLOAD": - return None - - return { - "description": DESCRIPTION, - "examples": EXAMPLES, - "category": "export", - }