From 3c3f287b90b88aa48f28364604ce6502699fa956 Mon Sep 17 00:00:00 2001 From: mferrera Date: Mon, 6 Nov 2023 13:41:08 +0100 Subject: [PATCH] CLN: Add typing to `common.sys` Also does some minor refactoring, particularly to the initialization function. --- src/xtgeo/common/sys.py | 488 +++++++++++++++++-------------- tests/test_common/test_system.py | 15 +- 2 files changed, 272 insertions(+), 231 deletions(-) diff --git a/src/xtgeo/common/sys.py b/src/xtgeo/common/sys.py index 51e1362df..14d6db0c6 100644 --- a/src/xtgeo/common/sys.py +++ b/src/xtgeo/common/sys.py @@ -1,18 +1,19 @@ -# -*- coding: utf-8 -*- """Module for basic XTGeo interaction with OS/system files and folders.""" +from __future__ import annotations + import hashlib import io import os import pathlib +import platform import re import struct import uuid from os.path import join -from platform import system as plfsys from tempfile import mkstemp from types import BuiltinFunctionType -from typing import Optional +from typing import TYPE_CHECKING, Callable, Literal, Optional, Type, Union import h5py import numpy as np @@ -22,6 +23,40 @@ from ._xyz_enum import _AttrType from .xtgeo_dialog import XTGeoDialog +if TYPE_CHECKING: + import numpy.typing as npt + import pandas as pd + + from xtgeo import ( + BlockedWell, + BlockedWells, + Cube, + Grid, + GridProperties, + GridProperty, + Points, + Polygons, + RegularSurface, + Surfaces, + Well, + Wells, + ) + + XTGeoObject = Union[ + BlockedWell, + BlockedWells, + Cube, + Grid, + GridProperty, + GridProperties, + Points, + Polygons, + RegularSurface, + Surfaces, + Well, + Wells, + ] + xtg = XTGeoDialog() logger = xtg.functionlogger(__name__) @@ -54,10 +89,16 @@ VALID_FILE_ALIASES = ["$fmu-v1", "$md5sum", "$random"] -def npfromfile(fname, dtype=np.float32, count=1, offset=0, mmap=False): +def npfromfile( + fname: Union[str, pathlib.Path, io.BytesIO, io.StringIO], + dtype: npt.DTypeLike = np.float32, + count: int = 1, + offset: int = 0, + mmap: bool = False, +) -> np.ndarray: """Wrapper round np.fromfile to be compatible with older np versions.""" try: - if mmap: + if mmap and not isinstance(fname, (io.BytesIO, io.StringIO)): vals = np.memmap( fname, dtype=dtype, shape=(count,), mode="r", offset=offset ) @@ -65,16 +106,20 @@ def npfromfile(fname, dtype=np.float32, count=1, offset=0, mmap=False): vals = np.fromfile(fname, dtype=dtype, count=count, offset=offset) except TypeError as err: # offset keyword requires numpy >= 1.17, need this for backward compat.: - if "'offset' is an invalid" in str(err): - with open(fname, "rb") as buffer: - buffer.seek(offset) - vals = np.fromfile(buffer, dtype=dtype, count=count) - else: + if "'offset' is an invalid" not in str(err): raise + if not isinstance(fname, (str, pathlib.Path)): + raise + with open(fname, "rb") as buffer: + buffer.seek(offset) + vals = np.fromfile(buffer, dtype=dtype, count=count) return vals -def check_folder(fname, raiseerror=None): +def check_folder( + fname: Union[str, pathlib.Path, io.BytesIO, io.StringIO], + raiseerror: Optional[Type[Exception]] = None, +) -> bool: """General function to check folder.""" _nn = _XTGeoFile(fname) status = _nn.check_folder(raiseerror=raiseerror) @@ -82,15 +127,18 @@ def check_folder(fname, raiseerror=None): return status -def generic_hash(gid, hashmethod="md5"): +def generic_hash( + gid: str, hashmethod: Union[Literal["md5", "sha256", "blake2d"], Callable] = "md5" +) -> str: """Return a unique hash ID for current instance. This hash can e.g. be used to compare two instances for equality. Args: - gid (str): Any string as signature, e.g. cumulative attributes of an instance. - hashmethod (str or function): Supported methods are "md5", "sha256", "blake2b" + gid: Any string as signature, e.g. cumulative attributes of an instance. + hashmethod: Supported methods are "md5", "sha256", "blake2b" or use a full function signature e.g. hashlib.sha128. + Defaults to md5. Returns: Hash signature. @@ -99,50 +147,65 @@ def generic_hash(gid, hashmethod="md5"): KeyError: String in hashmethod has an invalid option .. versionadded:: 2.14 + """ - validmethods = { + validmethods: dict[str, Callable] = { "md5": hashlib.md5, "sha256": hashlib.sha256, "blake2b": hashlib.blake2b, } - mhash = None - if isinstance(hashmethod, str): + if isinstance(hashmethod, str) and hashmethod in validmethods: mhash = validmethods[hashmethod]() elif isinstance(hashmethod, BuiltinFunctionType): mhash = hashmethod() + else: + raise ValueError(f"Invalid hash method provided: {hashmethod}") mhash.update(gid.encode()) return mhash.hexdigest() -class _XTGeoFile(object): - """A private class for file/stream handling in/out of XTGeo and CXTGeo. +class _XTGeoFile: + """ + A private class for file/stream handling in/out of XTGeo and CXTGeo. Interesting attributes: - xfile = _XTGeoFile(..some Path or str or BytesIO ...) + xfile = _XTGeoFile(..some Path or str or BytesIO ...) xfile.name: The absolute path to the file (str) xfile.file: The pathlib.Path instance xfile.memstream: Is True if memory stream - xfile.exist(): Returns True (provided mode 'r') if file exists, always True for 'w' + xfile.exists(): Returns True (provided mode 'r') if file exists, always True for 'w' xfile.check_file(...): As above but may raise an Excpetion xfile.check_folder(...): For folder; may raise an Excpetion xfile.splitext(): return file's stem and extension xfile.get_cfhandle(): Get C SWIG file handle xfile.cfclose(): Close current C SWIG filehandle - """ - def __init__(self, fobj, mode="rb", obj=None): - self._file = None # Path instance or BytesIO memory stream - self._tmpfile = None + def __init__( + self, + filelike: Union[str, pathlib.Path, io.BytesIO, io.StringIO], + mode: Literal["rb", "wb"] = "rb", + obj: XTGeoObject = None, + ) -> None: + logger.debug("Init ran for _XTGeoFile") + + if not isinstance(filelike, (str, pathlib.Path, io.BytesIO, io.StringIO)): + raise RuntimeError( + f"Cannot instantiate {self.__class__} from " + f"{filelike} of type {type(filelike)}. Expected " + f"a str, pathlib.Path, io.BytesIO, or io.StringIO." + ) + + self._file: Union[pathlib.Path, io.BytesIO, io.StringIO] + self._tmpfile: Optional[str] = None self._delete_after = False # delete file (e.g. tmp) afterwards self._mode = mode - self._memstream = False self._cfhandle = 0 self._cfhandlecount = 0 @@ -150,67 +213,51 @@ def __init__(self, fobj, mode="rb", obj=None): # for internal usage in tests; mimick window/mac with no fmemopen in C self._fake_nofmem = False - logger.debug("Init ran for _XTGeoFile") + if isinstance(filelike, str): + filelike = pathlib.Path(filelike) - # The self._file must be a Pathlib or a BytesIO instance - if isinstance(fobj, pathlib.Path): - self._file = fobj - elif isinstance(fobj, str): - self._file = pathlib.Path(fobj) - elif isinstance(fobj, io.BytesIO): - self._file = fobj - self._memstream = True - elif isinstance(fobj, io.StringIO): - self._file = fobj - self._memstream = True - elif isinstance(fobj, _XTGeoFile): - raise RuntimeError("Reinstancing object, not allowed", self.__class__) - else: - raise RuntimeError( - f"Illegal input, cannot continue ({self.__class__}) " - f"{fobj}: {type(fobj)}" - ) + self._file = filelike + self._memstream = isinstance(self._file, (io.BytesIO, io.StringIO)) if obj and not self._memstream: self.resolve_alias(obj) - logger.info("Ran init of %s, ID is %s", __name__, id(self)) + logger.debug("Ran init of %s, ID is %s", __name__, id(self)) @property - def memstream(self): - """Read only: Get True if file object is a memory stream (BytesIO).""" + def memstream(self) -> bool: + """Get whether or not this file is a io.BytesIO/StringIO memory stream.""" return self._memstream @property - def file(self): - """Read only: Get Path object (if input was file) or BytesIO object.""" + def file(self) -> Union[pathlib.Path, io.BytesIO, io.StringIO]: + """Get Path object (if input was file) or memory stream object.""" return self._file @property - def name(self): - """The absolute path name of a file.""" - logger.info("Get absolute name of file...") - - if self._memstream: - return self._file + def name(self) -> Union[str, io.BytesIO, io.StringIO]: + """Get the absolute path name of the file, or the memory stream.""" + if isinstance(self.file, (io.BytesIO, io.StringIO)): + return self.file try: - logger.debug("Try resolve...") - fname = str(self._file.resolve()) + logger.debug("Trying to resolve filepath") + fname = str(self.file.resolve()) except OSError: try: - logger.debug("Try resolve parent, then file...") + logger.debug("Trying to resolve parent, then file...") fname = os.path.abspath( - join(str(self._file.parent.resolve()), str(self._file.name)) + join(str(self.file.parent.resolve()), str(self.file.name)) ) except OSError: # means that also folder is invalid logger.debug("Last attempt of name resolving...") - fname = os.path.abspath(str(self._file)) + fname = os.path.abspath(str(self.file)) return fname - def resolve_alias(self, obj): - """Change a file path name alias to autogenerated name, based on rules. + def resolve_alias(self, obj: XTGeoObject) -> None: + """ + Change a file path name alias to autogenerated name, based on rules. Only the file stem name will be updated, not the file name extension. Any parent folders and file suffix/extension will be returned as is. @@ -218,7 +265,7 @@ def resolve_alias(self, obj): Aliases supported so far are '$md5sum' '$random' '$fmu-v1' Args: - obj (XTGeo instance): Instance of e.g. RegularSurface() + obj: Instance of some XTGeo object e.g. RegularSurface() Example:: >>> import xtgeo @@ -228,12 +275,14 @@ def resolve_alias(self, obj): /tmp/c144fe19742adac8187b97e7976ac68c.gri .. versionadded:: 2.14 + """ - fileobj = self._file + if self.memstream or isinstance(self.file, (io.BytesIO, io.StringIO)): + return - parent = fileobj.parent - stem = fileobj.stem - suffix = fileobj.suffix + parent = self.file.parent + stem = self.file.stem + suffix = self.file.suffix if "$" in stem and stem not in VALID_FILE_ALIASES: raise ValueError( @@ -260,74 +309,83 @@ def resolve_alias(self, obj): self._file = (parent / newname).with_suffix(suffix) - def exists(self): # was: file_exists - """Check if 'r' file, memory stream or folder exists, and returns True of OK.""" + def exists(self) -> bool: + """Returns True if 'r' file, memory stream, or folder exists.""" if "r" in self._mode: - if isinstance(self._file, io.BytesIO): - return True - - if self._file.exists(): + if isinstance(self.file, (io.BytesIO, io.StringIO)): return True + return self.file.exists() - return False - + # Writes and appends will always exist after writing return True - def check_file(self, raiseerror=None, raisetext=None): - """Check if a file exists, and raises an OSError if not. + def check_file( + self, + raiseerror: Optional[Type[Exception]] = None, + raisetext: Optional[str] = None, + ) -> bool: + """ + Check if a file exists, and raises an OSError if not. This is only meaningful for 'r' files. Args: - raiseerror (Exception): Type of Exception, default is None, which means - no Exception, just return False or True - raisetext (str): Which message to display if raiseerror, None gives a - default message. + raiseerror: Type of exception to raise. Default is None, which means + no Exception, just return False or True. + raisetext: Which message to display if raiseerror. Defaults to None + which gives a default message. + + Returns: + True if file exists and is readable, False if not. - Return: - status: True, if file exists and is readable, False if not. """ - logger.info("Checking file...") + logger.debug("Checking file...") - if self.memstream: + # Redundant but mypy can't follow when memstream is True + if self.memstream or isinstance(self.file, (io.BytesIO, io.StringIO)): return True if raisetext is None: raisetext = f"File {self.name} does not exist or cannot be accessed" if "r" in self._mode: - if not self._file.is_file() or not self.exists(): + if not self.file.is_file() or not self.exists(): if raiseerror is not None: raise raiseerror(raisetext) - return False return True - def check_folder(self, raiseerror=None, raisetext=None): - """Check if folder given in xfile exists and is writeable. + def check_folder( + self, + raiseerror: Optional[Type[Exception]] = None, + raisetext: Optional[str] = None, + ) -> bool: + """ + Check if folder given in file exists and is writeable. - The file itself may not exist (yet), only the folder is checked + The file itself may not exist (yet), only the folder is checked. Args: - raiseerror (exception): If none, then return True or False, else raise the - given Exception if False - raisetext (str): Text to raise. + raiseerror: Type of exception to raise. Default is None, which means + no Exception, just return False or True. + raisetext: Which message to display if raiseerror. Defaults to None + which gives a default message. - Return: - status: True, if folder exists and is writable, False if not. + Returns: + True if folder exists and is writable, False if not. Raises: ValueError: If the file is a memstream """ - if self.memstream: - raise ValueError("Tried to check folder status of a in-memory file") + # Redundant but mypy can't follow when memstream is True + if self.memstream or isinstance(self.file, (io.BytesIO, io.StringIO)): + raise ValueError("Cannot check folder status of an in-memory file") - logger.info("Checking folder...") + logger.debug("Checking folder...") - status = True - folder = self._file.parent + folder = self.file.parent if raisetext is None: raisetext = f"Folder {folder.name} does not exist or cannot be accessed" @@ -335,43 +393,19 @@ def check_folder(self, raiseerror=None, raisetext=None): if raiseerror: raise raiseerror(raisetext) - status = False - - return status - - # # Here are issues here on Windows/Mac in particular - - # status = True - - # if os.path.isdir(self._file): - # folder = self._file - # else: - # folder = os.path.dirname(self._file) - # if folder == "": - # folder = "." - - # if not os.path.exists(folder): - # if raiseerror: - # raise raiseerror("Folder does not exist: <{}>".format(folder)) - - # status = False - - # if os.path.exists(folder) and not os.access(folder, os.W_OK): - # if raiseerror: - # raise raiseerror( - # "Folder does exist but is not writable: <{}>".format(folder) - # ) - - # status = False + return False - # return status + return True - def splitext(self, lower=False): + def splitext(self, lower: bool = False) -> tuple[str, str]: """Return file stem and suffix, always lowercase if lower is True.""" - logger.info("Run splitext to get stem and suffix...") + if self.memstream or isinstance(self.file, (io.BytesIO, io.StringIO)): + raise ValueError("Cannot split extension of an in-memory file") + + logger.debug("Run splitext to get stem and suffix...") - stem = self._file.stem - suffix = self._file.suffix + stem = self.file.stem + suffix = self.file.suffix suffix = suffix.replace(".", "") if lower: @@ -380,105 +414,103 @@ def splitext(self, lower=False): return stem, suffix - def get_cfhandle(self): # was get_handle - """Get SWIG C file handle for CXTgeo. + def get_cfhandle(self) -> int: + """ + Get SWIG C file handle for CXTGeo. This is tied to cfclose() which closes the file. if _cfhandle already exists, then _cfhandlecount is increased with 1 + Returns: + int indicating the file handle number. + """ - # differ on Linux and other OS as Linux can use fmemopen() in C - islinux = True - fobj = None - if plfsys() != "Linux": - islinux = False + # Windows and pre-10.13 macOS lack fmemopen() + islinux = platform.system() == "Linux" if self._cfhandle and "Swig Object of type 'FILE" in str(self._cfhandle): self._cfhandlecount += 1 - logger.info("Get SWIG C fhandle no %s", self._cfhandlecount) + logger.debug("Get SWIG C fhandle no %s", self._cfhandlecount) return self._cfhandle - if isinstance(self._file, io.BytesIO) and self._mode == "rb" and islinux: - fobj = self._file.getvalue() # bytes type in Python3, str in Python2 - - # note that the typemap in swig computes the length for the buf/fobj! - self._memstream = True - - elif isinstance(self._file, io.BytesIO) and self._mode == "wb" and islinux: - fobj = bytes() - self._memstream = True + fobj: Union[bytes, str, io.BytesIO, io.StringIO] = self.name + if isinstance(self.file, io.BytesIO): + if self._mode == "rb" and islinux: + fobj = self.file.getvalue() + elif self._mode == "wb" and islinux: + fobj = bytes() + elif self._mode == "rb" and not islinux: + # Write stream to a temporary file + fds, self._tmpfile = mkstemp(prefix="tmpxtgeoio") + os.close(fds) + with open(self._tmpfile, "wb") as newfile: + newfile.write(self.file.getvalue()) - elif ( - isinstance(self._file, io.BytesIO) - and self._mode == "rb" - and not islinux # Windows or Darwin - ): - # windows/mac miss fmemopen; write buffer to a tmp instead as workaround - fds, self._tmpfile = mkstemp(prefix="tmpxtgeoio") - os.close(fds) - with open(self._tmpfile, "wb") as newfile: - newfile.write(self._file.getvalue()) - - else: - fobj = self.name - - if self._memstream: + if self.memstream: if islinux: cfhandle = _cxtgeo.xtg_fopen_bytestream(fobj, self._mode) else: cfhandle = _cxtgeo.xtg_fopen(self._tmpfile, self._mode) - else: try: cfhandle = _cxtgeo.xtg_fopen(fobj, self._mode) except TypeError as err: - raise IOError(f"Cannot open file: {fobj}") from err + raise IOError(f"Cannot open file: {fobj!r}") from err self._cfhandle = cfhandle self._cfhandlecount = 1 - logger.info("Get initial SWIG C fhandle no %s", self._cfhandlecount) + logger.debug("Get initial SWIG C fhandle no %s", self._cfhandlecount) return self._cfhandle - def cfclose(self, strict=True): - """Close SWIG C file handle by keeping track of _cfhandlecount. + def cfclose(self, strict: bool = True) -> bool: + """ + Close SWIG C file handle by keeping track of _cfhandlecount. + + Returns: + True if cfhandle is closed. - Return True if cfhandle is really closed. """ - logger.info("Request for closing SWIG fhandle no: %s", self._cfhandlecount) + logger.debug("Request for closing SWIG fhandle no: %s", self._cfhandlecount) - if self._cfhandle is None or self._cfhandlecount == 0: + if self._cfhandle == 0 or self._cfhandlecount == 0: if strict: raise RuntimeError("Ask to close a nonexisting C file handle") - self._cfhandle = None + self._cfhandle = 0 self._cfhandlecount = 0 return True - if self._cfhandlecount > 1 or self._cfhandlecount == 0: + if self._cfhandlecount > 1: self._cfhandlecount -= 1 - logger.info( + logger.debug( "Remaining SWIG cfhandles: %s, do not close...", self._cfhandlecount ) return False - if self._memstream and self._cfhandle and "w" in self._mode: + if ( + isinstance(self.file, io.BytesIO) + and self._cfhandle > 0 + and "w" in self._mode + ): # this assures that the file pointer is in the end of the current filehandle npos = _cxtgeo.xtg_ftell(self._cfhandle) buf = bytes(npos) - ier = _cxtgeo.xtg_get_fbuffer(self._cfhandle, buf) - if ier == 0: - self._file.write(buf) # write to bytesIO instance + + copy_code = _cxtgeo.xtg_get_fbuffer(self._cfhandle, buf) + # Returns EXIT_SUCCESS = 0 from C + if copy_code == 0: + self.file.write(buf) _cxtgeo.xtg_fflush(self._cfhandle) else: raise RuntimeError("Could not write stream for unknown reasons") - ier = _cxtgeo.xtg_fclose(self._cfhandle) - if ier != 0: - raise RuntimeError(f"Could not close C file, code {ier}") + close_code = _cxtgeo.xtg_fclose(self._cfhandle) + if close_code != 0: + raise RuntimeError(f"Could not close C file, code {close_code}") - logger.info("File is now closed for C io: %s", self.name) + logger.debug("File is now closed for C io: %s", self.name) if self._tmpfile: try: @@ -486,27 +518,30 @@ def cfclose(self, strict=True): except Exception as ex: # pylint: disable=W0703 logger.error("Could not remove tempfile for some reason: %s", ex) - self._cfhandle = None + self._cfhandle = 0 self._cfhandlecount = 0 - logger.info("Remaining SWIG cfhandles: %s, return is True", self._cfhandlecount) - + logger.debug( + "Remaining SWIG cfhandles: %s, return is True", self._cfhandlecount + ) return True def detect_fformat( self, details: Optional[bool] = False, suffixonly: Optional[bool] = False - ): - """Try to deduce format from looking at file signature. + ) -> str: + """ + Try to deduce format from looking at file signature. The file signature may be the initial part of the binary file/stream but if that fails, the file extension is used. Args: details: If True, more info is added to the return string (useful for some - formats) e.g. "hdf RegularSurface xtgeo" - suffixonly: If True, look at file suffix only. + formats) e.g. "hdf RegularSurface xtgeo". Defaults for False. + suffixonly: If True, look at file suffix only. Defaults to False. Returns: - A string with format spesification, e.g. "hdf". + A string with format specification, e.g. "hdf". + """ if not suffixonly: @@ -518,15 +553,18 @@ def detect_fformat( fmt = self._detect_format_by_extension() return self._validate_format(fmt) - def _detect_fformat_by_contents(self, details: Optional[bool] = False): + def _detect_fformat_by_contents( + self, details: Optional[bool] = False + ) -> Optional[str]: # Try the read the N first bytes maxbuf = 100 - if self.memstream: + if isinstance(self.file, (io.BytesIO, io.StringIO)): self.file.seek(0) buf = self.file.read(maxbuf) self.file.seek(0) else: + assert isinstance(self.file, pathlib.Path) if not self.exists(): raise ValueError(f"File {self.name} does not exist") with open(self.file, "rb") as fhandle: @@ -539,7 +577,7 @@ def _detect_fformat_by_contents(self, details: Optional[bool] = False): if len(buf) >= 4: _, hdf = struct.unpack("b 3s", buf[:4]) if hdf == b"HDF": - logger.info("Signature is hdf") + logger.debug("Signature is hdf") main = self._validate_format("hdf") fmt = "" @@ -564,12 +602,12 @@ def _detect_fformat_by_contents(self, details: Optional[bool] = False): if len(buf) >= 8: fortranblock, gricode = struct.unpack(">ii", buf[:8]) if fortranblock == 32 and gricode == -996: - logger.info("Signature is irap binary") + logger.debug("Signature is irap binary") return self._validate_format("irap_binary") # Petromod binary regular surface if b"Content=Map" in buf and b"DataUnitDistance" in buf: - logger.info("Signature is petromod") + logger.debug("Signature is petromod") return self._validate_format("petromod") # Eclipse binary 3D EGRID, look at FILEHEAD: @@ -581,19 +619,19 @@ def _detect_fformat_by_contents(self, details: Optional[bool] = False): fort1, name, num, _, fort2 = struct.unpack("> i 8s i 4s i", buf[:24]) if fort1 == 16 and name == b"FILEHEAD" and num == 100 and fort2 == 16: # Eclipse corner point EGRID - logger.info("Signature is egrid") + logger.debug("Signature is egrid") return self._validate_format("egrid") # Eclipse binary 3D UNRST, look for SEQNUM: # 'SEQNUM' 1 'INTE' if fort1 == 16 and name == b"SEQNUM " and num == 1 and fort2 == 16: # Eclipse UNRST - logger.info("Signature is unrst") + logger.debug("Signature is unrst") return self._validate_format("unrst") # Eclipse binary 3D INIT, look for INTEHEAD: # 'INTEHEAD' 411 'INTE' if fort1 == 16 and name == b"INTEHEAD" and num > 400 and fort2 == 16: # Eclipse INIT - logger.info("Signature is init") + logger.debug("Signature is init") return self._validate_format("init") @@ -601,11 +639,11 @@ def _detect_fformat_by_contents(self, details: Optional[bool] = False): name, _ = struct.unpack("8s b", buf[:9]) # ROFF binary 3D if name == b"roff-bin": - logger.info("Signature is roff_binary") + logger.debug("Signature is roff_binary") return self._validate_format("roff_binary") # ROFF ascii 3D if name == b"roff-asc": - logger.info("Signature is roff_ascii") + logger.debug("Signature is roff_ascii") return self._validate_format("roff_ascii") # RMS well format (ascii) @@ -626,21 +664,21 @@ def _detect_fformat_by_contents(self, details: Optional[bool] = False): and len(xbuf[1]) >= 1 and len(xbuf[2]) >= 10 ): - logger.info("Signature is rmswell") + logger.debug("Signature is rmswell") return self._validate_format("rmswell") return None - def _detect_format_by_extension(self): + def _detect_format_by_extension(self) -> str: """Detect format by extension.""" - if self.memstream: + if self.memstream or isinstance(self.file, (io.BytesIO, io.StringIO)): return "unknown" suffix = self.file.suffix[1:].lower() for fmt, variants in SUPPORTED_FORMATS.items(): if suffix in variants: - logger.info("Extension hints: %s", fmt) + logger.debug("Extension hints: %s", fmt) return fmt # if none of these above are accepted, check regular expression @@ -650,21 +688,20 @@ def _detect_format_by_extension(self): if "*" in var: pattern = re.compile(var) if pattern.match(suffix): - logger.info("Extension by regexp hints %s", fmt) + logger.debug("Extension by regexp hints %s", fmt) return fmt return "unknown" @staticmethod - def _validate_format(fmt): + def _validate_format(fmt: str) -> str: """Validate format.""" if fmt in SUPPORTED_FORMATS.keys() or fmt == "unknown": return fmt - else: - raise RuntimeError(f"Invalid format: {fmt}") + raise RuntimeError(f"Invalid format: {fmt}") @staticmethod - def generic_format_by_proposal(propose): + def generic_format_by_proposal(propose: str) -> str: """Get generic format by proposal.""" for fmt, variants in SUPPORTED_FORMATS.items(): if propose in variants: @@ -681,8 +718,8 @@ def generic_format_by_proposal(propose): raise ValueError(f"Non-supportred file extension: {propose}") -def inherit_docstring(inherit_from): - def decorator_set_docstring(func): +def inherit_docstring(inherit_from: Callable) -> Callable: + def decorator_set_docstring(func: Callable) -> Callable: if func.__doc__ is None and inherit_from.__doc__ is not None: func.__doc__ = inherit_from.__doc__ return func @@ -695,41 +732,39 @@ def decorator_set_docstring(func): # ---------------------------------------------------------------------------------- -def _convert_np_carr_int(length, np_array): +def _convert_np_carr_int(length: int, np_array: np.ndarray) -> np.ndarray: """Convert numpy 1D array to C array, assuming int type. The numpy is always a double (float64), so need to convert first """ carr = _cxtgeo.new_intarray(length) - np_array = np_array.astype(np.int32) - _cxtgeo.swig_numpy_to_carr_i1d(np_array, carr) - return carr -def _convert_np_carr_double(length, np_array): +def _convert_np_carr_double(length: int, np_array: np.ndarray) -> np.ndarray: """Convert numpy 1D array to C array, assuming double type.""" carr = _cxtgeo.new_doublearray(length) - _cxtgeo.swig_numpy_to_carr_1d(np_array, carr) - return carr -def _convert_carr_double_np(length, carray, nlen=None): +def _convert_carr_double_np( + length: int, carray: np.ndarray, nlen: Optional[int] = None +) -> np.ndarray: """Convert a C array to numpy, assuming double type.""" if nlen is None: nlen = length - nparray = _cxtgeo.swig_carr_to_numpy_1d(nlen, carray) - return nparray -def _get_carray(dataframe, attributes, attrname: str): - """Returns the C array pointer (via SWIG) for a given attr. +def _get_carray( + dataframe: pd.DataFrame, attributes: _AttrType, attrname: str +) -> Optional[np.ndarray]: + """ + Returns the C array pointer (via SWIG) for a given attr. Type conversion is double if float64, int32 if DISC attr. Returns None if log does not exist. @@ -745,5 +780,4 @@ def _get_carray(dataframe, attributes, attrname: str): carr = _convert_np_carr_int(nlen, np_array) else: carr = _convert_np_carr_double(nlen, np_array) - return carr diff --git a/tests/test_common/test_system.py b/tests/test_common/test_system.py index aaa66d157..38fd77f52 100644 --- a/tests/test_common/test_system.py +++ b/tests/test_common/test_system.py @@ -32,7 +32,7 @@ def test_generic_hash(): ahash = xsys.generic_hash("ABCDEF", hashmethod="blake2b") assert ahash[0:12] == "0bb3eb1511cb" - with pytest.raises(KeyError): + with pytest.raises(ValueError): ahash = xsys.generic_hash("ABCDEF", hashmethod="invalid") # pass a hashlib function @@ -136,15 +136,22 @@ def test_file_splitext(filename, stem, extension, obj): def test_xtgeo_file_reinstance(tmp_path): gfile = xtgeo._XTGeoFile(tmp_path / "test.txt") - with pytest.raises(RuntimeError, match="Reinstancing"): + with pytest.raises(RuntimeError, match="Cannot instantiate"): xtgeo._XTGeoFile(gfile) def test_xtgeo_file_bad_input(): - with pytest.raises(RuntimeError, match="input"): + with pytest.raises(RuntimeError, match="Cannot instantiate"): xtgeo._XTGeoFile(1.0) +def test_xtgeo_file_resolve_alias_on_stream_doesnt_modify_or_raise(): + stream = io.BytesIO() + xtg_file = xtgeo._XTGeoFile(stream) + xtg_file.resolve_alias(xtgeo.create_box_grid((1, 1, 1))) + assert stream == xtg_file.file + + def test_xtgeo_file_bad_alias(tmp_path): with pytest.raises(ValueError, match="not a valid alias"): xtgeo._XTGeoFile(tmp_path / "$NO_ALIAS").resolve_alias( @@ -163,7 +170,7 @@ def test_xtgeo_file_properties(testpath, filename): assert isinstance(gfile, xtgeo._XTGeoFile) assert isinstance(gfile._file, pathlib.Path) - assert gfile._memstream is False + assert gfile.memstream is False assert gfile._mode == "rb" assert gfile._delete_after is False assert pathlib.Path(gfile.name) == (testpath / filename).absolute().resolve()