Skip to content

Commit

Permalink
Upload seismic segy-files using OpenVDS SEGYImport (#141)
Browse files Browse the repository at this point in the history
* Added functionality to upload SEGY files using OpenVDS SEGYImport binary

* Seismic OpenVDS upload bugfix and improvements

* Do not require OpenVDS for mac aka darwin

* Update to reflect new blob_url format returned from sumo-core

* Segy openvds upload and test code improvements

* Minor improvements seismic upload and test

* Fix bug on blob upload return values

* Very minor updates
  • Loading branch information
roywilly authored Mar 20, 2023
1 parent a8e9702 commit e0d4d27
Show file tree
Hide file tree
Showing 6 changed files with 221 additions and 21 deletions.
3 changes: 2 additions & 1 deletion requirements/requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -6,4 +6,5 @@ sumo-wrapper-python @ git+https://github.com/equinor/sumo-wrapper-python.git@mas
xtgeo
azure-core
pyarrow; python_version > "3.6.1"
# ert
# ert
OpenVDS; sys_platform != 'darwin'
84 changes: 68 additions & 16 deletions src/fmu/sumo/uploader/_fileondisk.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,11 @@
import os
import datetime
import time
import sys
import subprocess
import logging
import hashlib
import base64

import yaml

from sumo.wrapper._request_error import (
Expand Down Expand Up @@ -66,6 +67,26 @@ def _datetime_now():
return datetime.datetime.now().isoformat()


def _get_segyimport_cmdstr(blob_url, object_id, file_path, sample_unit):
"""Return the command string for running OpenVDS SEGYImport"""
url = '"azureSAS:' + blob_url["baseuri"][6:] + '"'
url_conn = '"Suffix=?' + blob_url["auth"] + '"'
persistent_id = '"' + object_id + '"'

pythonPath = os.path.dirname(sys.executable)
path_to_SEGYImport = os.path.join(pythonPath, '..', 'bin', 'SEGYImport')

cmdstr = ' '.join([path_to_SEGYImport,
'--compression-method', 'RLE',
'--brick-size', '64',
'--sample-unit', sample_unit,
'--url', url,
'--url-connection', url_conn,
'--persistentID', persistent_id,
file_path])
return cmdstr


class FileOnDisk:
def __init__(self, path: str, metadata_path=None, verbosity="INFO"):
"""
Expand Down Expand Up @@ -93,12 +114,16 @@ def __init__(self, path: str, metadata_path=None, verbosity="INFO"):

self.metadata["_sumo"] = {}

self.byte_string = file_to_byte_string(path)
self.metadata["_sumo"]["blob_size"] = len(self.byte_string)
digester = hashlib.md5(self.byte_string)
self.metadata["_sumo"]["blob_md5"] = base64.b64encode(
digester.digest()
).decode("utf-8")
if self.metadata["data"]["format"] in ["openvds", "segy"]:
self.metadata["_sumo"]["blob_size"] = 0
self.byte_string = None
else:
self.byte_string = file_to_byte_string(path)
self.metadata["_sumo"]["blob_size"] = len(self.byte_string)
digester = hashlib.md5(self.byte_string)
self.metadata["_sumo"]["blob_md5"] = base64.b64encode(
digester.digest()
).decode("utf-8")

def __repr__(self):
if not self.metadata:
Expand Down Expand Up @@ -165,6 +190,11 @@ def upload_to_sumo(self, sumo_parent_id, sumo_connection):
result["blob_file_path"] = self.path
result["blob_file_size"] = self.size

# Uploader converts segy-files to OpenVDS:
if self.metadata["data"]["format"] in ["openvds", "segy"]:
self.metadata["data"]["format"] = "openvds"
self.metadata["file"]["checksum_md5"] = ""

response = self._upload_metadata(
sumo_connection=sumo_connection, sumo_parent_id=sumo_parent_id
)
Expand Down Expand Up @@ -215,20 +245,42 @@ def upload_to_sumo(self, sumo_parent_id, sumo_connection):
for i in backoff:
logger.debug("backoff in inner loop is %s", str(i))
try:
response = self._upload_byte_string(
sumo_connection=sumo_connection,
object_id=self.sumo_object_id,
blob_url=blob_url,
)
upload_response["status_code"] = response.status_code
upload_response["text"] = response.text
if self.metadata["data"]["format"] in ["openvds", "segy"]:
if sys.platform.startswith('darwin'):
# OpenVDS does not support Mac/darwin directly
# Outer code expects and interprets http error codes
upload_response["status_code"] = 418
upload_response["text"] = "Can not perform SEGY upload since OpenVDS does not support Mac"
else:
if self.metadata["data"]["vertical_domain"] == 'depth':
sample_unit = 'm'
else:
sample_unit = 'ms' # aka time domain
cmd_str = _get_segyimport_cmdstr(blob_url, self.sumo_object_id, self.path, sample_unit)
cmd_result = subprocess.run(cmd_str,
capture_output=True, text=True, shell=True)
if cmd_result.returncode == 0:
upload_response["status_code"] = 200
upload_response["text"] = "SEGY uploaded as OpenVDS."
else:
# Outer code expects and interprets http error codes
upload_response["status_code"] = 418
upload_response["text"] = "FAILED SEGY upload as OpenVDS. " + cmd_result.stderr
else:
response = self._upload_byte_string(
sumo_connection=sumo_connection,
object_id=self.sumo_object_id,
blob_url=blob_url,
)
upload_response["status_code"] = response.status_code
upload_response["text"] = response.text

_t1_blob = time.perf_counter()

result["blob_upload_response_status_code"] = upload_response[
"status_code"
]
result["blob_upload_response_text"] = upload_response["text"]
result["blob_upload_response_status_text"] = upload_response["text"]
result["blob_upload_time_start"] = _t0_blob
result["blob_upload_time_end"] = _t1_blob
result["blob_upload_time_elapsed"] = _t1_blob - _t0_blob
Expand All @@ -241,7 +293,7 @@ def upload_to_sumo(self, sumo_parent_id, sumo_connection):
result["blob_upload_response_status_code"] = upload_response[
"status_code"
]
result["blob_upload_response_text"] = upload_response["text"]
result["blob_upload_response_status_text"] = upload_response["text"]
result["blob_upload_time_start"] = _t0_blob
result["blob_upload_time_end"] = _t1_blob
result["blob_upload_time_elapsed"] = _t1_blob - _t0_blob
Expand Down
4 changes: 2 additions & 2 deletions src/fmu/sumo/uploader/_fileonjob.py
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,7 @@ def upload_to_sumo(self, sumo_parent_id, sumo_connection):
result["blob_upload_response_status_code"] = upload_response[
"status_code"
]
result["blob_upload_response_text"] = upload_response["text"]
result["blob_upload_response_status_text"] = upload_response["text"]
result["blob_upload_time_start"] = _t0_blob
result["blob_upload_time_end"] = _t1_blob
result["blob_upload_time_elapsed"] = _t1_blob - _t0_blob
Expand All @@ -171,7 +171,7 @@ def upload_to_sumo(self, sumo_parent_id, sumo_connection):
result["blob_upload_response_status_code"] = upload_response[
"status_code"
]
result["blob_upload_response_text"] = upload_response["text"]
result["blob_upload_response_status_text"] = upload_response["text"]
result["blob_upload_time_start"] = _t0_blob
result["blob_upload_time_end"] = _t1_blob
result["blob_upload_time_elapsed"] = _t1_blob - _t0_blob
Expand Down
4 changes: 2 additions & 2 deletions tests/data/test_case_080/.seismic.segy.yml
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,8 @@ fmu:
reference: rms/structural_model

case:
name: TestCase from fmu.sumo
uuid: 8bb56d60-8758-481a-89a4-6bac8561d38e # (pseudo-)random valid uuid4
name: TestCase from fmu.sumo with segy and openvds
uuid: 8bb56d60-2222-5555-7547-6bac8561d38e # (pseudo-)random valid uuid4
user:
id: testuser # $USER from ERT
description:
Expand Down
63 changes: 63 additions & 0 deletions tests/data/test_case_080/case_segy.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
# example YAML file for ensembles (a parent object in Sumo)

$schema: https://main-fmu-schemas-dev.radix.equinor.com/schemas/0.8.0/fmu_results.json
version: "0.8.0" # including for REP and others to build logic on.
source: fmu # always included, and always this value if FMU made it. For quick identification for external consumers.
class: case
fmu:
case:
name: TestCase from fmu.sumo with segy and openvds
uuid: 8bb56d60-2222-5555-7547-6bac8561d38e # (pseudo-)random valid uuid4xs
user:
id: testuser # $USER from ERT
description:
- yet other detailed description
- optional
model:
name: ff
revision: 21.0.0.dev
description:
- detailed description
- optional

# access:
# Level containing properties used for access control

access:
asset:
name: Drogon # adding level to make room for unique ID in the future

# ensemble objects have no ssdl details

# masterdata:
# These are static data read from external config YAML file (fmuconfig or similar).
# Some of these static data are used while processing to populate some non-required
# fields in data block (for example names alias, color-tables)

masterdata:
smda:
country:
- identifier: Norway
uuid: ad214d85-8a1d-19da-e053-c918a4889309
discovery:
- short_identifier: DROGON
uuid: ad214d85-8a1d-19da-e053-c918a4889309
field:
- identifier: DROGON
uuid: 00000000-0000-0000-0000-000000000000
coordinate_system:
identifier: ST_WGS84_UTM37N_P32637
uuid: ad214d85-dac7-19da-e053-c918a4889309
stratigraphic_column:
identifier: DROGON_2020
uuid: 12345678-1234-1234-1234-123456789012

tracklog:
- datetime: 2020-10-28T14:28:02
user:
id: testuser
event: created
- datetime: 2020-10-28T14:46:14
user:
id: testuser
event: updated
84 changes: 84 additions & 0 deletions tests/test_uploader.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,14 @@
import time
from pathlib import Path
import logging
import subprocess
import json

from fmu.sumo import uploader

if not sys.platform.startswith('darwin'):
import openvds

# run the tests from the root dir
TEST_DIR = Path(__file__).parent / "../"
os.chdir(TEST_DIR)
Expand Down Expand Up @@ -200,6 +205,85 @@ def test_wrong_metadata(token):
assert total == 2


@pytest.mark.skipif(sys.platform.startswith('darwin'), reason="do not run OpenVDS SEGYImport on mac os")
def test_openvds_available(token):
python_path = os.path.dirname(sys.executable)
logger.info(python_path)
path_to_SEGYImport = os.path.join(python_path, '..', 'bin', 'SEGYImport')
logger.info(path_to_SEGYImport)
check_SEGYImport_version = subprocess.run([path_to_SEGYImport, '--version'],
capture_output=True, text=True)
assert check_SEGYImport_version.returncode == 0
assert "SEGYImport" in check_SEGYImport_version.stdout


@pytest.mark.skipif(sys.platform.startswith('darwin'), reason="do not run OpenVDS SEGYImport on mac os")
def test_seismic_openvds_file(token):
"""Upload seimic in OpenVDS format to Sumo. Assert that it is there."""
sumo_connection = uploader.SumoConnection(env=ENV, token=token)
e = uploader.CaseOnDisk(
case_metadata_path="tests/data/test_case_080/case_segy.yml",
sumo_connection=sumo_connection,
)
segy_filepath = "tests/data/test_case_080/seismic.segy"
e.register()
e.add_files(segy_filepath)
e.upload()

time.sleep(4)

query = f"_sumo.parent_object: {e.fmu_case_uuid}"
search_results = sumo_connection.api.get(
"/search", query=query, size=100, **{"from": 0}
)
total = search_results.get("hits").get("total").get("value")
assert total == 1

assert search_results.get("hits").get("hits")[0].get("_source").get("data").get("format") == "openvds"
assert search_results.get("hits").get("hits")[0].get("_source").get("file").get("checksum_md5") == ""

# Get SAS token to read from az blob store
child_id = search_results.get("hits").get("hits")[0].get("_id")
method = f"/objects('{child_id}')/blob/authuri"
token_results = sumo_connection.api.get(method)
url = '"azureSAS:' + json.loads(token_results.decode("utf-8")).get("baseuri")[6:] + child_id + '"'
url_conn = '"Suffix=?' + json.loads(token_results.decode("utf-8")).get("auth") + '"'

# Export from az blob store to a segy file on local disk
exported_filepath = "exported.segy"
if os.path.exists(exported_filepath):
os.remove(exported_filepath)
python_path = os.path.dirname(sys.executable)
path_to_SEGYExport = os.path.join(python_path, '..', 'bin', 'SEGYExport')
cmdstr = ' '.join([path_to_SEGYExport,
'--url', url,
'--connection', url_conn,
'exported.segy'])
cmd_result = subprocess.run(cmdstr,
capture_output=True, text=True, shell=True)
assert cmd_result.returncode == 0
assert os.path.isfile(exported_filepath)
assert os.stat(exported_filepath).st_size == os.stat(segy_filepath).st_size
if os.path.exists(exported_filepath):
os.remove(exported_filepath)

# Use OpenVDS Python API to read directly from az cloud storage
handle = openvds.open(url[1:-1], url_conn[1:-1])
layout = openvds.getLayout(handle)
channel_count = layout.getChannelCount()
assert channel_count == 3
assert layout.getChannelName(0) == "Amplitude"

# Delete this case
path = f"/objects('{e.fmu_case_uuid}')"
sumo_connection.api.delete(path=path)
time.sleep(30) # Sumo removes the container

# OpenVDS reads should fail after deletion
with pytest.raises(RuntimeError, match="Error on downloading*"):
handle = openvds.open(url[1:-1], url_conn[1:-1])


def test_teardown(token):
"""Teardown all testdata"""
sumo_connection = uploader.SumoConnection(env=ENV, token=token)
Expand Down

0 comments on commit e0d4d27

Please sign in to comment.