Skip to content

Commit

Permalink
CLN: Update code base to use py10 annotation
Browse files Browse the repository at this point in the history
  • Loading branch information
JB Lovland authored and janbjorge committed Nov 20, 2023
1 parent 5988281 commit 9e24f9b
Show file tree
Hide file tree
Showing 26 changed files with 419 additions and 443 deletions.
5 changes: 3 additions & 2 deletions src/xtgeo/common/calc.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,8 @@

from __future__ import annotations

from typing import Any, Literal, Optional, Sequence
from collections.abc import Sequence
from typing import Any, Literal

import numpy as np

Expand Down Expand Up @@ -333,7 +334,7 @@ def vectorpair_angle3d(
p2: Sequence[float],
degrees: bool = True,
birdview: bool = False,
) -> Optional[float]:
) -> float | None:
"""Find angle in 3D between two vectors
::
Expand Down
45 changes: 22 additions & 23 deletions src/xtgeo/common/sys.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,11 @@
import re
import struct
import uuid
from collections.abc import Callable
from os.path import join
from tempfile import mkstemp
from types import BuiltinFunctionType
from typing import TYPE_CHECKING, Callable, Literal, Optional, Type, Union
from typing import TYPE_CHECKING, Literal, Union

import h5py
import numpy as np
Expand Down Expand Up @@ -89,7 +90,7 @@


def npfromfile(
fname: Union[str, pathlib.Path, io.BytesIO, io.StringIO],
fname: str | pathlib.Path | io.BytesIO | io.StringIO,
dtype: npt.DTypeLike = np.float32,
count: int = 1,
offset: int = 0,
Expand All @@ -116,8 +117,8 @@ def npfromfile(


def check_folder(
fname: Union[str, pathlib.Path, io.BytesIO, io.StringIO],
raiseerror: Optional[Type[Exception]] = None,
fname: str | pathlib.Path | io.BytesIO | io.StringIO,
raiseerror: type[Exception] | None = None,
) -> bool:
"""General function to check folder."""
_nn = _XTGeoFile(fname)
Expand All @@ -127,7 +128,7 @@ def check_folder(


def generic_hash(
gid: str, hashmethod: Union[Literal["md5", "sha256", "blake2d"], Callable] = "md5"
gid: str, hashmethod: Literal["md5", "sha256", "blake2d"] | Callable = "md5"
) -> str:
"""Return a unique hash ID for current instance.
Expand Down Expand Up @@ -188,7 +189,7 @@ class _XTGeoFile:

def __init__(
self,
filelike: Union[str, pathlib.Path, io.BytesIO, io.StringIO],
filelike: str | pathlib.Path | io.BytesIO | io.StringIO,
mode: Literal["rb", "wb"] = "rb",
obj: XTGeoObject = None,
) -> None:
Expand All @@ -201,8 +202,8 @@ def __init__(
f"a str, pathlib.Path, io.BytesIO, or io.StringIO."
)

self._file: Union[pathlib.Path, io.BytesIO, io.StringIO]
self._tmpfile: Optional[str] = None
self._file: pathlib.Path | io.BytesIO | io.StringIO
self._tmpfile: str | None = None
self._delete_after = False # delete file (e.g. tmp) afterwards
self._mode = mode

Expand All @@ -229,12 +230,12 @@ def memstream(self) -> bool:
return self._memstream

@property
def file(self) -> Union[pathlib.Path, io.BytesIO, io.StringIO]:
def file(self) -> pathlib.Path | io.BytesIO | io.StringIO:
"""Get Path object (if input was file) or memory stream object."""
return self._file

@property
def name(self) -> Union[str, io.BytesIO, io.StringIO]:
def name(self) -> str | io.BytesIO | io.StringIO:
"""Get the absolute path name of the file, or the memory stream."""
if isinstance(self.file, (io.BytesIO, io.StringIO)):
return self.file
Expand Down Expand Up @@ -320,8 +321,8 @@ def exists(self) -> bool:

def check_file(
self,
raiseerror: Optional[Type[Exception]] = None,
raisetext: Optional[str] = None,
raiseerror: type[Exception] | None = None,
raisetext: str | None = None,
) -> bool:
"""
Check if a file exists, and raises an OSError if not.
Expand Down Expand Up @@ -357,8 +358,8 @@ def check_file(

def check_folder(
self,
raiseerror: Optional[Type[Exception]] = None,
raisetext: Optional[str] = None,
raiseerror: type[Exception] | None = None,
raisetext: str | None = None,
) -> bool:
"""
Check if folder given in file exists and is writeable.
Expand Down Expand Up @@ -433,12 +434,12 @@ def get_cfhandle(self) -> int:
logger.debug("Get SWIG C fhandle no %s", self._cfhandlecount)
return self._cfhandle

fobj: Union[bytes, str, io.BytesIO, io.StringIO] = self.name
fobj: bytes | str | io.BytesIO | io.StringIO = self.name
if isinstance(self.file, io.BytesIO):
if self._mode == "rb" and islinux:
fobj = self.file.getvalue()
elif self._mode == "wb" and islinux:
fobj = bytes()
fobj = b"" # Empty bytes obj.
elif self._mode == "rb" and not islinux:
# Write stream to a temporary file
fds, self._tmpfile = mkstemp(prefix="tmpxtgeoio")
Expand All @@ -455,7 +456,7 @@ def get_cfhandle(self) -> int:
try:
cfhandle = _cxtgeo.xtg_fopen(fobj, self._mode)
except TypeError as err:
raise IOError(f"Cannot open file: {fobj!r}") from err
raise OSError(f"Cannot open file: {fobj!r}") from err

self._cfhandle = cfhandle
self._cfhandlecount = 1
Expand Down Expand Up @@ -525,7 +526,7 @@ def cfclose(self, strict: bool = True) -> bool:
return True

def detect_fformat(
self, details: Optional[bool] = False, suffixonly: Optional[bool] = False
self, details: bool | None = False, suffixonly: bool | None = False
) -> str:
"""
Try to deduce format from looking at file signature.
Expand All @@ -552,9 +553,7 @@ def detect_fformat(
fmt = self._detect_format_by_extension()
return self._validate_format(fmt)

def _detect_fformat_by_contents(
self, details: Optional[bool] = False
) -> Optional[str]:
def _detect_fformat_by_contents(self, details: bool | None = False) -> str | None:
# Try the read the N first bytes
maxbuf = 100

Expand Down Expand Up @@ -750,7 +749,7 @@ def _convert_np_carr_double(length: int, np_array: np.ndarray) -> np.ndarray:


def _convert_carr_double_np(
length: int, carray: np.ndarray, nlen: Optional[int] = None
length: int, carray: np.ndarray, nlen: int | None = None
) -> np.ndarray:
"""Convert a C array to numpy, assuming double type."""
if nlen is None:
Expand All @@ -761,7 +760,7 @@ def _convert_carr_double_np(

def _get_carray(
dataframe: pd.DataFrame, attributes: _AttrType, attrname: str
) -> Optional[np.ndarray]:
) -> np.ndarray | None:
"""
Returns the C array pointer (via SWIG) for a given attr.
Expand Down
31 changes: 16 additions & 15 deletions src/xtgeo/cube/_cube_import.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,12 @@
xline, 1 for iline in this example) but shall be constant per axis
"""
from __future__ import annotations

import json
from collections import OrderedDict, defaultdict
from copy import deepcopy
from struct import unpack
from typing import Dict, List, Tuple
from warnings import warn

import numpy as np
Expand All @@ -47,7 +48,7 @@
logger = null_logger(__name__)


def import_segy(sfile: xtgeo._XTGeoFile) -> Dict:
def import_segy(sfile: xtgeo._XTGeoFile) -> dict:
"""Import SEGY via the SegyIO library.
Args:
Expand Down Expand Up @@ -76,7 +77,7 @@ def import_segy(sfile: xtgeo._XTGeoFile) -> Dict:
else:
raise
except Exception as anyerror: # catch the rest
raise IOError(f"Cannot parse SEGY file: {str(anyerror)}") from anyerror
raise OSError(f"Cannot parse SEGY file: {str(anyerror)}") from anyerror

if not attributes:
raise ValueError("Could not get attributes for segy file")
Expand All @@ -85,7 +86,7 @@ def import_segy(sfile: xtgeo._XTGeoFile) -> Dict:
return attributes


def _import_segy_all_traces(segyfile: segyio.segy.SegyFile) -> Dict:
def _import_segy_all_traces(segyfile: segyio.segy.SegyFile) -> dict:
"""Import a a full cube SEGY via the SegyIO library to xtgeo format spec.
Here, the segyio.tools.cube function can be applied
Expand Down Expand Up @@ -128,7 +129,7 @@ def _process_cube_values(values: np.ndarray) -> np.ndarray:

def _segy_all_traces_attributes(
segyfile: segyio.segy.SegyFile, ncol, nrow, nlay
) -> Dict:
) -> dict:
"""Get the geometrical values xtgeo needs for a cube definition."""
trcode = segyio.TraceField.TraceIdentificationCode
traceidcodes = segyfile.attributes(trcode)[:].reshape(ncol, nrow)
Expand Down Expand Up @@ -169,7 +170,7 @@ def _segy_all_traces_attributes(
}


def _import_segy_incomplete_traces(segyfile: segyio.segy.SegyFile) -> Dict:
def _import_segy_incomplete_traces(segyfile: segyio.segy.SegyFile) -> dict:
"""Import a a cube SEGY with incomplete traces via the SegyIO library.
Note that the undefined value will be xtgeo.UNDEF (large number)!
Expand Down Expand Up @@ -242,7 +243,7 @@ def _import_segy_incomplete_traces(segyfile: segyio.segy.SegyFile) -> Dict:
return attrs


def _inverse_anyline_map(anylines: List[int]) -> Dict:
def _inverse_anyline_map(anylines: list[int]) -> dict:
"""Small helper function to get e.g. inline 2345: [0, 1, 2, .., 70].
I.e. to get a mapping between inline number and a list of possible indices
Expand All @@ -255,7 +256,7 @@ def _inverse_anyline_map(anylines: List[int]) -> Dict:
return anyll


def _find_long_line(anyll: dict, nany: int) -> List:
def _find_long_line(anyll: dict, nany: int) -> list:
"""Helper function; get index of vectors indices to be used for calculations.
Look for a "sufficiently" long inline/xline, as long distance between points
Expand Down Expand Up @@ -285,13 +286,13 @@ def _geometry_incomplete_traces(
segyfile: segyio.segy.SegyFile,
ncol: int,
nrow: int,
ilines: List[int],
xlines: List[int],
ilines_case: List[int],
xlines_case: List[int],
ilines: list[int],
xlines: list[int],
ilines_case: list[int],
xlines_case: list[int],
ispacing: int,
xspacing: int,
) -> List:
) -> list:
"""Compute xtgeo attributes (mostly geometries) for incomplete trace cube."""
attrs = dict()

Expand Down Expand Up @@ -350,7 +351,7 @@ def _geometry_incomplete_traces(

def _get_coordinate(
segyfile: segyio.segy.SegyFile, segyindex: int
) -> Tuple[float, float, float, float]:
) -> tuple[float, float, float, float]:
"""Helper function to get coordinates given a index."""
origin = segyfile.header[segyindex][
segyio.su.cdpx,
Expand Down Expand Up @@ -474,7 +475,7 @@ def _scan_segy_trace(sfile, outfile):

def import_stormcube(
sfile: xtgeo._XTGeoFile,
) -> Dict:
) -> dict:
"""Import on StormCube format."""
# The ASCII header has all the metadata on the form:
# ---------------------------------------------------------------------
Expand Down
13 changes: 7 additions & 6 deletions src/xtgeo/cube/cube1.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,13 @@
# coding: utf-8
"""Module for a seismic (or whatever) cube."""
from __future__ import annotations

import functools
import numbers
import os.path
import pathlib
import tempfile
import warnings
from typing import Any, Optional, Tuple
from typing import Any

import deprecation
import numpy as np
Expand Down Expand Up @@ -951,10 +952,10 @@ def to_roxar(
self,
project: Any,
name: str,
folder: Optional[str] = None,
folder: str | None = None,
propname: str = "seismic_attribute",
domain: str = "time",
compression: Tuple[str, float] = ("wavelet", 5.0),
compression: tuple[str, float] = ("wavelet", 5.0),
target: str = "seismic",
): # pragma: no cover
"""Export (transfer) a cube from a XTGeo cube object to Roxar data.
Expand Down Expand Up @@ -1033,7 +1034,7 @@ def scan_segy_traces(sfile, outfile=None):
if oflag:
# pass
logger.info("OUTPUT to screen...")
with open(outfile, "r") as out:
with open(outfile) as out:
for line in out:
print(line.rstrip("\r\n"))
os.remove(outfile)
Expand All @@ -1060,7 +1061,7 @@ def scan_segy_header(sfile, outfile=None):

if flag:
logger.info("OUTPUT to screen...")
with open(outfile, "r") as out:
with open(outfile) as out:
for line in out:
print(line.rstrip("\r\n"))
os.remove(outfile)
Expand Down
35 changes: 18 additions & 17 deletions src/xtgeo/grid3d/_ecl_logi_head.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from __future__ import annotations

from dataclasses import dataclass
from typing import List, Optional

from ._ecl_output_file import Simulator

Expand Down Expand Up @@ -36,24 +37,24 @@ class LogiHead:
"""

dissolved_gas: Optional[bool] = None
vaporized_oil: Optional[bool] = None
directional: Optional[bool] = None
radial: Optional[bool] = None
reversible: Optional[bool] = None
hysterisis: Optional[bool] = None
dual_porosity: Optional[bool] = None
end_point_scaling: Optional[bool] = None
directional_end_point_scaling: Optional[bool] = None
reversible_end_point_scaling: Optional[bool] = None
alternate_end_point_scaling: Optional[bool] = None
miscible_displacement: Optional[bool] = None
scale_water_pressure1: Optional[bool] = None
scale_water_pressure2: Optional[bool] = None
coal_bed_methane: Optional[bool] = None
dissolved_gas: bool | None = None
vaporized_oil: bool | None = None
directional: bool | None = None
radial: bool | None = None
reversible: bool | None = None
hysterisis: bool | None = None
dual_porosity: bool | None = None
end_point_scaling: bool | None = None
directional_end_point_scaling: bool | None = None
reversible_end_point_scaling: bool | None = None
alternate_end_point_scaling: bool | None = None
miscible_displacement: bool | None = None
scale_water_pressure1: bool | None = None
scale_water_pressure2: bool | None = None
coal_bed_methane: bool | None = None

@classmethod
def from_file_values(cls, values: List[bool], simulator: Simulator):
def from_file_values(cls, values: list[bool], simulator: Simulator):
"""Construct a LogiHead from the array following the LOGIHEAD keyword
Args:
values: The iterable of boolean values following the LOGIHEAD keyword
Expand Down
Loading

0 comments on commit 9e24f9b

Please sign in to comment.