From e0d4d2720ee55cd6aa9ce7658422211a6107652f Mon Sep 17 00:00:00 2001 From: Roy Willy Haug <5484176+roywilly@users.noreply.github.com> Date: Mon, 20 Mar 2023 09:49:03 +0100 Subject: [PATCH] Upload seismic segy-files using OpenVDS SEGYImport (#141) * Added functionality to upload SEGY files using OpenVDS SEGYImport binary * Seismic OpenVDS upload bugfix and improvements * Do not require OpenVDS for mac aka darwin * Update to reflect new blob_url format returned from sumo-core * Segy openvds upload and test code improvements * Minor improvements seismic upload and test * Fix bug on blob upload return values * Very minor updates --- requirements/requirements.txt | 3 +- src/fmu/sumo/uploader/_fileondisk.py | 84 +++++++++++++++++----- src/fmu/sumo/uploader/_fileonjob.py | 4 +- tests/data/test_case_080/.seismic.segy.yml | 4 +- tests/data/test_case_080/case_segy.yml | 63 ++++++++++++++++ tests/test_uploader.py | 84 ++++++++++++++++++++++ 6 files changed, 221 insertions(+), 21 deletions(-) create mode 100644 tests/data/test_case_080/case_segy.yml diff --git a/requirements/requirements.txt b/requirements/requirements.txt index d9b843e2..1de939e8 100644 --- a/requirements/requirements.txt +++ b/requirements/requirements.txt @@ -6,4 +6,5 @@ sumo-wrapper-python @ git+https://github.com/equinor/sumo-wrapper-python.git@mas xtgeo azure-core pyarrow; python_version > "3.6.1" -# ert \ No newline at end of file +# ert +OpenVDS; sys_platform != 'darwin' diff --git a/src/fmu/sumo/uploader/_fileondisk.py b/src/fmu/sumo/uploader/_fileondisk.py index 8d3d4813..4772861f 100644 --- a/src/fmu/sumo/uploader/_fileondisk.py +++ b/src/fmu/sumo/uploader/_fileondisk.py @@ -9,10 +9,11 @@ import os import datetime import time +import sys +import subprocess import logging import hashlib import base64 - import yaml from sumo.wrapper._request_error import ( @@ -66,6 +67,26 @@ def _datetime_now(): return datetime.datetime.now().isoformat() +def _get_segyimport_cmdstr(blob_url, object_id, file_path, sample_unit): + """Return the command string for running OpenVDS SEGYImport""" + url = '"azureSAS:' + blob_url["baseuri"][6:] + '"' + url_conn = '"Suffix=?' + blob_url["auth"] + '"' + persistent_id = '"' + object_id + '"' + + pythonPath = os.path.dirname(sys.executable) + path_to_SEGYImport = os.path.join(pythonPath, '..', 'bin', 'SEGYImport') + + cmdstr = ' '.join([path_to_SEGYImport, + '--compression-method', 'RLE', + '--brick-size', '64', + '--sample-unit', sample_unit, + '--url', url, + '--url-connection', url_conn, + '--persistentID', persistent_id, + file_path]) + return cmdstr + + class FileOnDisk: def __init__(self, path: str, metadata_path=None, verbosity="INFO"): """ @@ -93,12 +114,16 @@ def __init__(self, path: str, metadata_path=None, verbosity="INFO"): self.metadata["_sumo"] = {} - self.byte_string = file_to_byte_string(path) - self.metadata["_sumo"]["blob_size"] = len(self.byte_string) - digester = hashlib.md5(self.byte_string) - self.metadata["_sumo"]["blob_md5"] = base64.b64encode( - digester.digest() - ).decode("utf-8") + if self.metadata["data"]["format"] in ["openvds", "segy"]: + self.metadata["_sumo"]["blob_size"] = 0 + self.byte_string = None + else: + self.byte_string = file_to_byte_string(path) + self.metadata["_sumo"]["blob_size"] = len(self.byte_string) + digester = hashlib.md5(self.byte_string) + self.metadata["_sumo"]["blob_md5"] = base64.b64encode( + digester.digest() + ).decode("utf-8") def __repr__(self): if not self.metadata: @@ -165,6 +190,11 @@ def upload_to_sumo(self, sumo_parent_id, sumo_connection): result["blob_file_path"] = self.path result["blob_file_size"] = self.size + # Uploader converts segy-files to OpenVDS: + if self.metadata["data"]["format"] in ["openvds", "segy"]: + self.metadata["data"]["format"] = "openvds" + self.metadata["file"]["checksum_md5"] = "" + response = self._upload_metadata( sumo_connection=sumo_connection, sumo_parent_id=sumo_parent_id ) @@ -215,20 +245,42 @@ def upload_to_sumo(self, sumo_parent_id, sumo_connection): for i in backoff: logger.debug("backoff in inner loop is %s", str(i)) try: - response = self._upload_byte_string( - sumo_connection=sumo_connection, - object_id=self.sumo_object_id, - blob_url=blob_url, - ) - upload_response["status_code"] = response.status_code - upload_response["text"] = response.text + if self.metadata["data"]["format"] in ["openvds", "segy"]: + if sys.platform.startswith('darwin'): + # OpenVDS does not support Mac/darwin directly + # Outer code expects and interprets http error codes + upload_response["status_code"] = 418 + upload_response["text"] = "Can not perform SEGY upload since OpenVDS does not support Mac" + else: + if self.metadata["data"]["vertical_domain"] == 'depth': + sample_unit = 'm' + else: + sample_unit = 'ms' # aka time domain + cmd_str = _get_segyimport_cmdstr(blob_url, self.sumo_object_id, self.path, sample_unit) + cmd_result = subprocess.run(cmd_str, + capture_output=True, text=True, shell=True) + if cmd_result.returncode == 0: + upload_response["status_code"] = 200 + upload_response["text"] = "SEGY uploaded as OpenVDS." + else: + # Outer code expects and interprets http error codes + upload_response["status_code"] = 418 + upload_response["text"] = "FAILED SEGY upload as OpenVDS. " + cmd_result.stderr + else: + response = self._upload_byte_string( + sumo_connection=sumo_connection, + object_id=self.sumo_object_id, + blob_url=blob_url, + ) + upload_response["status_code"] = response.status_code + upload_response["text"] = response.text _t1_blob = time.perf_counter() result["blob_upload_response_status_code"] = upload_response[ "status_code" ] - result["blob_upload_response_text"] = upload_response["text"] + result["blob_upload_response_status_text"] = upload_response["text"] result["blob_upload_time_start"] = _t0_blob result["blob_upload_time_end"] = _t1_blob result["blob_upload_time_elapsed"] = _t1_blob - _t0_blob @@ -241,7 +293,7 @@ def upload_to_sumo(self, sumo_parent_id, sumo_connection): result["blob_upload_response_status_code"] = upload_response[ "status_code" ] - result["blob_upload_response_text"] = upload_response["text"] + result["blob_upload_response_status_text"] = upload_response["text"] result["blob_upload_time_start"] = _t0_blob result["blob_upload_time_end"] = _t1_blob result["blob_upload_time_elapsed"] = _t1_blob - _t0_blob diff --git a/src/fmu/sumo/uploader/_fileonjob.py b/src/fmu/sumo/uploader/_fileonjob.py index 4daddcc6..3f0699f3 100644 --- a/src/fmu/sumo/uploader/_fileonjob.py +++ b/src/fmu/sumo/uploader/_fileonjob.py @@ -159,7 +159,7 @@ def upload_to_sumo(self, sumo_parent_id, sumo_connection): result["blob_upload_response_status_code"] = upload_response[ "status_code" ] - result["blob_upload_response_text"] = upload_response["text"] + result["blob_upload_response_status_text"] = upload_response["text"] result["blob_upload_time_start"] = _t0_blob result["blob_upload_time_end"] = _t1_blob result["blob_upload_time_elapsed"] = _t1_blob - _t0_blob @@ -171,7 +171,7 @@ def upload_to_sumo(self, sumo_parent_id, sumo_connection): result["blob_upload_response_status_code"] = upload_response[ "status_code" ] - result["blob_upload_response_text"] = upload_response["text"] + result["blob_upload_response_status_text"] = upload_response["text"] result["blob_upload_time_start"] = _t0_blob result["blob_upload_time_end"] = _t1_blob result["blob_upload_time_elapsed"] = _t1_blob - _t0_blob diff --git a/tests/data/test_case_080/.seismic.segy.yml b/tests/data/test_case_080/.seismic.segy.yml index 83a1f1bf..54985bd4 100644 --- a/tests/data/test_case_080/.seismic.segy.yml +++ b/tests/data/test_case_080/.seismic.segy.yml @@ -29,8 +29,8 @@ fmu: reference: rms/structural_model case: - name: TestCase from fmu.sumo - uuid: 8bb56d60-8758-481a-89a4-6bac8561d38e # (pseudo-)random valid uuid4 + name: TestCase from fmu.sumo with segy and openvds + uuid: 8bb56d60-2222-5555-7547-6bac8561d38e # (pseudo-)random valid uuid4 user: id: testuser # $USER from ERT description: diff --git a/tests/data/test_case_080/case_segy.yml b/tests/data/test_case_080/case_segy.yml new file mode 100644 index 00000000..67a2718e --- /dev/null +++ b/tests/data/test_case_080/case_segy.yml @@ -0,0 +1,63 @@ +# example YAML file for ensembles (a parent object in Sumo) + +$schema: https://main-fmu-schemas-dev.radix.equinor.com/schemas/0.8.0/fmu_results.json +version: "0.8.0" # including for REP and others to build logic on. +source: fmu # always included, and always this value if FMU made it. For quick identification for external consumers. +class: case +fmu: + case: + name: TestCase from fmu.sumo with segy and openvds + uuid: 8bb56d60-2222-5555-7547-6bac8561d38e # (pseudo-)random valid uuid4xs + user: + id: testuser # $USER from ERT + description: + - yet other detailed description + - optional + model: + name: ff + revision: 21.0.0.dev + description: + - detailed description + - optional + +# access: +# Level containing properties used for access control + +access: + asset: + name: Drogon # adding level to make room for unique ID in the future + +# ensemble objects have no ssdl details + +# masterdata: +# These are static data read from external config YAML file (fmuconfig or similar). +# Some of these static data are used while processing to populate some non-required +# fields in data block (for example names alias, color-tables) + +masterdata: + smda: + country: + - identifier: Norway + uuid: ad214d85-8a1d-19da-e053-c918a4889309 + discovery: + - short_identifier: DROGON + uuid: ad214d85-8a1d-19da-e053-c918a4889309 + field: + - identifier: DROGON + uuid: 00000000-0000-0000-0000-000000000000 + coordinate_system: + identifier: ST_WGS84_UTM37N_P32637 + uuid: ad214d85-dac7-19da-e053-c918a4889309 + stratigraphic_column: + identifier: DROGON_2020 + uuid: 12345678-1234-1234-1234-123456789012 + +tracklog: + - datetime: 2020-10-28T14:28:02 + user: + id: testuser + event: created + - datetime: 2020-10-28T14:46:14 + user: + id: testuser + event: updated diff --git a/tests/test_uploader.py b/tests/test_uploader.py index e868c048..f4cc1035 100644 --- a/tests/test_uploader.py +++ b/tests/test_uploader.py @@ -4,9 +4,14 @@ import time from pathlib import Path import logging +import subprocess +import json from fmu.sumo import uploader +if not sys.platform.startswith('darwin'): + import openvds + # run the tests from the root dir TEST_DIR = Path(__file__).parent / "../" os.chdir(TEST_DIR) @@ -200,6 +205,85 @@ def test_wrong_metadata(token): assert total == 2 +@pytest.mark.skipif(sys.platform.startswith('darwin'), reason="do not run OpenVDS SEGYImport on mac os") +def test_openvds_available(token): + python_path = os.path.dirname(sys.executable) + logger.info(python_path) + path_to_SEGYImport = os.path.join(python_path, '..', 'bin', 'SEGYImport') + logger.info(path_to_SEGYImport) + check_SEGYImport_version = subprocess.run([path_to_SEGYImport, '--version'], + capture_output=True, text=True) + assert check_SEGYImport_version.returncode == 0 + assert "SEGYImport" in check_SEGYImport_version.stdout + + +@pytest.mark.skipif(sys.platform.startswith('darwin'), reason="do not run OpenVDS SEGYImport on mac os") +def test_seismic_openvds_file(token): + """Upload seimic in OpenVDS format to Sumo. Assert that it is there.""" + sumo_connection = uploader.SumoConnection(env=ENV, token=token) + e = uploader.CaseOnDisk( + case_metadata_path="tests/data/test_case_080/case_segy.yml", + sumo_connection=sumo_connection, + ) + segy_filepath = "tests/data/test_case_080/seismic.segy" + e.register() + e.add_files(segy_filepath) + e.upload() + + time.sleep(4) + + query = f"_sumo.parent_object: {e.fmu_case_uuid}" + search_results = sumo_connection.api.get( + "/search", query=query, size=100, **{"from": 0} + ) + total = search_results.get("hits").get("total").get("value") + assert total == 1 + + assert search_results.get("hits").get("hits")[0].get("_source").get("data").get("format") == "openvds" + assert search_results.get("hits").get("hits")[0].get("_source").get("file").get("checksum_md5") == "" + + # Get SAS token to read from az blob store + child_id = search_results.get("hits").get("hits")[0].get("_id") + method = f"/objects('{child_id}')/blob/authuri" + token_results = sumo_connection.api.get(method) + url = '"azureSAS:' + json.loads(token_results.decode("utf-8")).get("baseuri")[6:] + child_id + '"' + url_conn = '"Suffix=?' + json.loads(token_results.decode("utf-8")).get("auth") + '"' + + # Export from az blob store to a segy file on local disk + exported_filepath = "exported.segy" + if os.path.exists(exported_filepath): + os.remove(exported_filepath) + python_path = os.path.dirname(sys.executable) + path_to_SEGYExport = os.path.join(python_path, '..', 'bin', 'SEGYExport') + cmdstr = ' '.join([path_to_SEGYExport, + '--url', url, + '--connection', url_conn, + 'exported.segy']) + cmd_result = subprocess.run(cmdstr, + capture_output=True, text=True, shell=True) + assert cmd_result.returncode == 0 + assert os.path.isfile(exported_filepath) + assert os.stat(exported_filepath).st_size == os.stat(segy_filepath).st_size + if os.path.exists(exported_filepath): + os.remove(exported_filepath) + + # Use OpenVDS Python API to read directly from az cloud storage + handle = openvds.open(url[1:-1], url_conn[1:-1]) + layout = openvds.getLayout(handle) + channel_count = layout.getChannelCount() + assert channel_count == 3 + assert layout.getChannelName(0) == "Amplitude" + + # Delete this case + path = f"/objects('{e.fmu_case_uuid}')" + sumo_connection.api.delete(path=path) + time.sleep(30) # Sumo removes the container + + # OpenVDS reads should fail after deletion + with pytest.raises(RuntimeError, match="Error on downloading*"): + handle = openvds.open(url[1:-1], url_conn[1:-1]) + + def test_teardown(token): """Teardown all testdata""" sumo_connection = uploader.SumoConnection(env=ENV, token=token)