From 27a6ce7c20338a77241dfaf80f1f7ac1da77dd18 Mon Sep 17 00:00:00 2001 From: Mathias Lohne Date: Thu, 12 Dec 2024 11:10:24 +0100 Subject: [PATCH] Update type hints Go from `Optional[str]` to `str | None` Use `list`, `dict` and so on directly Use `str | int` instead of `Union[str, int]` And do all of it consistently. --- cognite/extractorutils/__init__.py | 2 + cognite/extractorutils/_inner_util.py | 6 +- cognite/extractorutils/base.py | 29 +++-- cognite/extractorutils/configtools/_util.py | 16 ++- .../extractorutils/configtools/elements.py | 98 +++++++-------- cognite/extractorutils/configtools/loaders.py | 52 ++++---- .../extractorutils/configtools/validators.py | 5 +- cognite/extractorutils/exceptions.py | 5 +- cognite/extractorutils/metrics.py | 36 +++--- cognite/extractorutils/statestore/_base.py | 7 +- cognite/extractorutils/statestore/hashing.py | 48 ++++---- .../extractorutils/statestore/watermark.py | 31 ++--- cognite/extractorutils/threading.py | 8 +- cognite/extractorutils/unstable/core/base.py | 12 +- cognite/extractorutils/uploader/_base.py | 16 +-- cognite/extractorutils/uploader/assets.py | 24 ++-- .../extractorutils/uploader/data_modeling.py | 26 ++-- cognite/extractorutils/uploader/events.py | 18 +-- cognite/extractorutils/uploader/files.py | 64 +++++----- cognite/extractorutils/uploader/raw.py | 20 +-- .../extractorutils/uploader/time_series.py | 114 +++++++++--------- cognite/extractorutils/uploader_extractor.py | 22 ++-- cognite/extractorutils/uploader_types.py | 16 +-- cognite/extractorutils/util.py | 44 ++++--- 24 files changed, 350 insertions(+), 369 deletions(-) diff --git a/cognite/extractorutils/__init__.py b/cognite/extractorutils/__init__.py index 9048cf0f..9aae77f1 100644 --- a/cognite/extractorutils/__init__.py +++ b/cognite/extractorutils/__init__.py @@ -18,3 +18,5 @@ __version__ = "7.5.4" from .base import Extractor + +__all__ = ["Extractor"] diff --git a/cognite/extractorutils/_inner_util.py b/cognite/extractorutils/_inner_util.py index f9377e7f..de7cc4e5 100644 --- a/cognite/extractorutils/_inner_util.py +++ b/cognite/extractorutils/_inner_util.py @@ -18,7 +18,7 @@ import json from decimal import Decimal -from typing import Any, Dict, Union +from typing import Any def _resolve_log_level(level: str) -> int: @@ -37,7 +37,7 @@ def resolve_log_level_for_httpx(level: str) -> str: class _DecimalEncoder(json.JSONEncoder): - def default(self, obj: Any) -> Dict[str, str]: + def default(self, obj: Any) -> dict[str, str]: if isinstance(obj, Decimal): return {"type": "decimal_encoded", "value": str(obj)} return super(_DecimalEncoder, self).default(obj) @@ -47,7 +47,7 @@ class _DecimalDecoder(json.JSONDecoder): def __init__(self, *args: Any, **kwargs: Any) -> None: json.JSONDecoder.__init__(self, *args, object_hook=self.object_hook, **kwargs) - def object_hook(self, obj_dict: Dict[str, str]) -> Union[Dict[str, str], Decimal]: + def object_hook(self, obj_dict: dict[str, str]) -> dict[str, str] | Decimal: if obj_dict.get("type") == "decimal_encoded": return Decimal(obj_dict["value"]) return obj_dict diff --git a/cognite/extractorutils/base.py b/cognite/extractorutils/base.py index 222dccd2..90480b53 100644 --- a/cognite/extractorutils/base.py +++ b/cognite/extractorutils/base.py @@ -19,7 +19,7 @@ from enum import Enum from threading import Thread from types import TracebackType -from typing import Any, Callable, Dict, Generic, Optional, Type, TypeVar +from typing import Any, Callable, Generic, Type, TypeVar from dotenv import find_dotenv, load_dotenv @@ -40,6 +40,7 @@ class ReloadConfigAction(Enum): CustomConfigClass = TypeVar("CustomConfigClass", bound=BaseConfig) +RunHandle = Callable[[CogniteClient, AbstractStateStore, CustomConfigClass, CancellationToken], None] class Extractor(Generic[CustomConfigClass]): @@ -68,27 +69,25 @@ class Extractor(Generic[CustomConfigClass]): heartbeat_waiting_time: Time interval between each heartbeat to the extraction pipeline in seconds. """ - _config_singleton: Optional[CustomConfigClass] = None - _statestore_singleton: Optional[AbstractStateStore] = None + _config_singleton: CustomConfigClass | None = None + _statestore_singleton: AbstractStateStore | None = None def __init__( self, *, name: str, description: str, - version: Optional[str] = None, - run_handle: Optional[ - Callable[[CogniteClient, AbstractStateStore, CustomConfigClass, CancellationToken], None] - ] = None, + version: str | None = None, + run_handle: RunHandle | None = None, config_class: Type[CustomConfigClass], - metrics: Optional[BaseMetrics] = None, + metrics: BaseMetrics | None = None, use_default_state_store: bool = True, - cancellation_token: Optional[CancellationToken] = None, - config_file_path: Optional[str] = None, + cancellation_token: CancellationToken | None = None, + config_file_path: str | None = None, continuous_extractor: bool = False, heartbeat_waiting_time: int = 600, handle_interrupts: bool = True, - reload_config_interval: Optional[int] = 300, + reload_config_interval: int | None = 300, reload_config_action: ReloadConfigAction = ReloadConfigAction.DO_NOTHING, ): self.name = name @@ -111,7 +110,7 @@ def __init__( self.cognite_client: CogniteClient self.state_store: AbstractStateStore self.config: CustomConfigClass - self.extraction_pipeline: Optional[ExtractionPipeline] + self.extraction_pipeline: ExtractionPipeline | None self.logger: logging.Logger self.should_be_restarted = False @@ -121,7 +120,7 @@ def __init__( else: self.metrics = BaseMetrics(extractor_name=name, extractor_version=self.version) - def _initial_load_config(self, override_path: Optional[str] = None) -> None: + def _initial_load_config(self, override_path: str | None = None) -> None: """ Load a configuration file, either from the specified path, or by a path specified by the user in a command line arg. Will quit further execution of no path is given. @@ -177,7 +176,7 @@ def _load_state_store(self) -> None: Either way, the state_store attribute is guaranteed to be set after calling this method. """ - def recursive_find_state_store(d: Dict[str, Any]) -> Optional[StateStoreConfig]: + def recursive_find_state_store(d: dict[str, Any]) -> StateStoreConfig | None: for k in d: if is_dataclass(d[k]): res = recursive_find_state_store(d[k].__dict__) @@ -323,7 +322,7 @@ def heartbeat_loop() -> None: return self def __exit__( - self, exc_type: Optional[Type[BaseException]], exc_val: Optional[BaseException], exc_tb: Optional[TracebackType] + self, exc_type: Type[BaseException] | None, exc_val: BaseException | None, exc_tb: TracebackType | None ) -> bool: """ Shuts down the extractor. Makes sure states are preserved, that all uploads of data and metrics are done, etc. diff --git a/cognite/extractorutils/configtools/_util.py b/cognite/extractorutils/configtools/_util.py index f42a8987..fc536787 100644 --- a/cognite/extractorutils/configtools/_util.py +++ b/cognite/extractorutils/configtools/_util.py @@ -14,7 +14,7 @@ import base64 import re from pathlib import Path -from typing import Any, Callable, Dict, List, Optional, Tuple, Union +from typing import Any, Callable from cryptography.hazmat.primitives import hashes from cryptography.hazmat.primitives import serialization as serialization @@ -24,7 +24,7 @@ from cognite.extractorutils.exceptions import InvalidConfigError -def _to_snake_case(dictionary: Dict[str, Any], case_style: str) -> Dict[str, Any]: +def _to_snake_case(dictionary: dict[str, Any], case_style: str) -> dict[str, Any]: """ Ensure that all keys in the dictionary follows the snake casing convention (recursively, so any sub-dictionaries are changed too). @@ -37,11 +37,11 @@ def _to_snake_case(dictionary: Dict[str, Any], case_style: str) -> Dict[str, Any An updated dictionary with keys in the given convention. """ - def fix_list(list_: List[Any], key_translator: Callable[[str], str]) -> List[Any]: + def fix_list(list_: list[Any], key_translator: Callable[[str], str]) -> list[Any]: if list_ is None: return [] - new_list: List[Any] = [None] * len(list_) + new_list: list[Any] = [None] * len(list_) for i, element in enumerate(list_): if isinstance(element, dict): new_list[i] = fix_dict(element, key_translator) @@ -51,11 +51,11 @@ def fix_list(list_: List[Any], key_translator: Callable[[str], str]) -> List[Any new_list[i] = element return new_list - def fix_dict(dict_: Dict[str, Any], key_translator: Callable[[str], str]) -> Dict[str, Any]: + def fix_dict(dict_: dict[str, Any], key_translator: Callable[[str], str]) -> dict[str, Any]: if dict_ is None: return {} - new_dict: Dict[str, Any] = {} + new_dict: dict[str, Any] = {} for key in dict_: if isinstance(dict_[key], dict): new_dict[key_translator(key)] = fix_dict(dict_[key], key_translator) @@ -81,9 +81,7 @@ def translate_camel(key: str) -> str: raise ValueError(f"Invalid case style: {case_style}") -def _load_certificate_data( - cert_path: str | Path, password: Optional[str] -) -> Union[Tuple[str, str], Tuple[bytes, bytes]]: +def _load_certificate_data(cert_path: str | Path, password: str | None) -> tuple[str, str] | tuple[bytes, bytes]: path = Path(cert_path) if isinstance(cert_path, str) else cert_path cert_data = Path(path).read_bytes() diff --git a/cognite/extractorutils/configtools/elements.py b/cognite/extractorutils/configtools/elements.py index 49754a0b..801d785a 100644 --- a/cognite/extractorutils/configtools/elements.py +++ b/cognite/extractorutils/configtools/elements.py @@ -19,7 +19,7 @@ from enum import Enum from logging.handlers import TimedRotatingFileHandler from time import sleep -from typing import Any, Dict, List, Optional, Tuple, Union +from typing import Any from urllib.parse import urljoin, urlparse import yaml @@ -59,8 +59,8 @@ class CertificateConfig: """ path: str - password: Optional[str] - authority_url: Optional[str] = None + password: str | None + authority_url: str | None = None @dataclass @@ -70,15 +70,15 @@ class AuthenticatorConfig: """ client_id: str - scopes: List[str] - secret: Optional[str] = None - tenant: Optional[str] = None - token_url: Optional[str] = None - resource: Optional[str] = None - audience: Optional[str] = None + scopes: list[str] + secret: str | None = None + tenant: str | None = None + token_url: str | None = None + resource: str | None = None + audience: str | None = None authority: str = "https://login.microsoftonline.com/" min_ttl: float = 30 # minimum time to live: refresh token ahead of expiration - certificate: Optional[CertificateConfig] = None + certificate: CertificateConfig | None = None @dataclass @@ -88,13 +88,13 @@ class ConnectionConfig: """ disable_gzip: bool = False - status_forcelist: List[int] = field(default_factory=lambda: [429, 502, 503, 504]) + status_forcelist: list[int] = field(default_factory=lambda: [429, 502, 503, 504]) max_retries: int = 10 max_retries_connect: int = 3 max_retry_backoff: int = 30 max_connection_pool_size: int = 50 disable_ssl: bool = False - proxies: Dict[str, str] = field(default_factory=dict) + proxies: dict[str, str] = field(default_factory=dict) @dataclass @@ -104,8 +104,8 @@ class EitherIdConfig: An EitherId can only hold one ID type, not both. """ - id: Optional[int] - external_id: Optional[str] + id: int | None + external_id: str | None @property def either_id(self) -> EitherId: @@ -129,7 +129,7 @@ def __hash__(self) -> int: return hash(self._interval) @classmethod - def _parse_expression(cls, expression: str) -> Tuple[int, str]: + def _parse_expression(cls, expression: str) -> tuple[int, str]: # First, try to parse pure number and assume seconds (for backwards compatibility) try: return int(expression), f"{expression}s" @@ -189,7 +189,7 @@ def __init__(self, expression: str) -> None: self._bytes, self._expression = FileSizeConfig._parse_expression(expression) @classmethod - def _parse_expression(cls, expression: str) -> Tuple[int, str]: + def _parse_expression(cls, expression: str) -> tuple[int, str]: # First, try to parse pure number and assume bytes try: return int(expression), f"{expression}s" @@ -285,20 +285,20 @@ class CogniteConfig: project: str idp_authentication: AuthenticatorConfig - data_set: Optional[EitherIdConfig] = None - data_set_id: Optional[int] = None - data_set_external_id: Optional[str] = None - extraction_pipeline: Optional[EitherIdConfig] = None + data_set: EitherIdConfig | None = None + data_set_id: int | None = None + data_set_external_id: str | None = None + extraction_pipeline: EitherIdConfig | None = None timeout: TimeIntervalConfig = TimeIntervalConfig("30s") connection: ConnectionConfig = field(default_factory=ConnectionConfig) - security_categories: Optional[List[int]] = None + security_categories: list[int] | None = None external_id_prefix: str = "" host: str = "https://api.cognitedata.com" def get_cognite_client( self, client_name: str, - token_custom_args: Optional[Dict[str, str]] = None, + token_custom_args: dict[str, str] | None = None, use_experimental_sdk: bool = False, ) -> CogniteClient: from cognite.client.config import global_config @@ -345,7 +345,7 @@ def get_cognite_client( ) elif self.idp_authentication.secret: - kwargs: Dict[str, Any] = {} + kwargs: dict[str, Any] = {} if self.idp_authentication.token_url: _validate_https_url(self.idp_authentication.token_url, "Token URL") kwargs["token_url"] = self.idp_authentication.token_url @@ -388,7 +388,7 @@ def get_cognite_client( return CogniteClient(client_config) - def get_data_set(self, cdf_client: CogniteClient) -> Optional[DataSet]: + def get_data_set(self, cdf_client: CogniteClient) -> DataSet | None: if self.data_set_external_id: logging.getLogger(__name__).warning( "Using data-set-external-id is deprecated, please use data-set/external-id instead" @@ -407,7 +407,7 @@ def get_data_set(self, cdf_client: CogniteClient) -> Optional[DataSet]: external_id=self.data_set.either_id.external_id, ) - def get_extraction_pipeline(self, cdf_client: CogniteClient) -> Optional[ExtractionPipeline]: + def get_extraction_pipeline(self, cdf_client: CogniteClient) -> ExtractionPipeline | None: if not self.extraction_pipeline: return None @@ -439,12 +439,12 @@ class LoggingConfig: Logging settings, such as log levels and path to log file """ - console: Optional[_ConsoleLoggingConfig] - file: Optional[_FileLoggingConfig] + console: _ConsoleLoggingConfig | None + file: _FileLoggingConfig | None # enables metrics on the number of log messages recorded (per logger and level) # In order to collect/see result MetricsConfig should be set as well, so metrics are propagated to # Prometheus and/or Cognite - metrics: Optional[bool] = False + metrics: bool | None = False def setup_logging(self, suppress_console: bool = False) -> None: """ @@ -505,10 +505,10 @@ def setup_logging(self, suppress_console: bool = False) -> None: class _PushGatewayConfig: host: str job_name: str - username: Optional[str] - password: Optional[str] + username: str | None + password: str | None - clear_after: Optional[TimeIntervalConfig] + clear_after: TimeIntervalConfig | None push_interval: TimeIntervalConfig = TimeIntervalConfig("30s") @@ -520,9 +520,9 @@ class _PromServerConfig: @dataclass class _CogniteMetricsConfig: external_id_prefix: str - asset_name: Optional[str] - asset_external_id: Optional[str] - data_set: Optional[EitherIdConfig] = None + asset_name: str | None + asset_external_id: str | None + data_set: EitherIdConfig | None = None push_interval: TimeIntervalConfig = TimeIntervalConfig("30s") @@ -534,13 +534,13 @@ class MetricsConfig: Series. """ - push_gateways: Optional[List[_PushGatewayConfig]] - cognite: Optional[_CogniteMetricsConfig] - server: Optional[_PromServerConfig] + push_gateways: list[_PushGatewayConfig] | None + cognite: _CogniteMetricsConfig | None + server: _PromServerConfig | None - def start_pushers(self, cdf_client: CogniteClient, cancellation_token: Optional[CancellationToken] = None) -> None: - self._pushers: List[AbstractMetricsPusher] = [] - self._clear_on_stop: Dict[PrometheusPusher, int] = {} + def start_pushers(self, cdf_client: CogniteClient, cancellation_token: CancellationToken | None = None) -> None: + self._pushers: list[AbstractMetricsPusher] = [] + self._clear_on_stop: dict[PrometheusPusher, int] = {} push_gateways = self.push_gateways or [] @@ -608,9 +608,9 @@ class ConfigType(Enum): @dataclass class _BaseConfig: - _file_hash: Optional[str] = field(init=False, repr=False, default=None) + _file_hash: str | None = field(init=False, repr=False, default=None) - type: Optional[ConfigType] + type: ConfigType | None cognite: CogniteConfig @@ -620,7 +620,7 @@ class BaseConfig(_BaseConfig): Basis for an extractor config, containing config version, ``CogniteConfig`` and ``LoggingConfig`` """ - version: Optional[Union[str, int]] + version: str | int | None logger: LoggingConfig @@ -659,14 +659,14 @@ class StateStoreConfig: Configuration of the State Store, containing ``LocalStateStoreConfig`` or ``RawStateStoreConfig`` """ - raw: Optional[RawStateStoreConfig] = None - local: Optional[LocalStateStoreConfig] = None + raw: RawStateStoreConfig | None = None + local: LocalStateStoreConfig | None = None def create_state_store( self, - cdf_client: Optional[CogniteClient] = None, + cdf_client: CogniteClient | None = None, default_to_local: bool = True, - cancellation_token: Optional[CancellationToken] = None, + cancellation_token: CancellationToken | None = None, ) -> AbstractStateStore: """ Create a state store object based on the config. @@ -728,8 +728,8 @@ class IgnorePattern: """ pattern: str - options: Optional[list[RegExpFlag]] = None - flags: Optional[list[RegExpFlag]] = None + options: list[RegExpFlag] | None = None + flags: list[RegExpFlag] | None = None def compile(self) -> re.Pattern[str]: """ diff --git a/cognite/extractorutils/configtools/loaders.py b/cognite/extractorutils/configtools/loaders.py index 8e0cc1e9..7f33af7a 100644 --- a/cognite/extractorutils/configtools/loaders.py +++ b/cognite/extractorutils/configtools/loaders.py @@ -22,7 +22,7 @@ from enum import Enum from hashlib import sha256 from pathlib import Path -from typing import Any, Callable, Dict, Generic, Iterable, List, Optional, TextIO, Type, TypeVar, Union, cast +from typing import Any, Callable, Generic, Iterable, TextIO, Type, TypeVar, cast import dacite import yaml @@ -61,11 +61,11 @@ class KeyVaultLoader: Class responsible for configuring keyvault for clients using Azure """ - def __init__(self, config: Optional[dict]): + def __init__(self, config: dict | None): self.config = config - self.credentials: Optional[TokenCredential] = None - self.client: Optional[SecretClient] = None + self.credentials: TokenCredential | None = None + self.client: SecretClient | None = None def _init_client(self) -> None: from dotenv import find_dotenv, load_dotenv @@ -148,10 +148,10 @@ def _env_constructor(_: yaml.SafeLoader, node: yaml.Node) -> bool: def _load_yaml_dict_raw( - source: Union[TextIO, str], + source: TextIO | str, expand_envvars: bool = True, - keyvault_loader: Optional[KeyVaultLoader] = None, -) -> Dict[str, Any]: + keyvault_loader: KeyVaultLoader | None = None, +) -> dict[str, Any]: loader = _EnvLoader if expand_envvars else yaml.SafeLoader class SafeLoaderIgnoreUnknown(yaml.SafeLoader): @@ -190,12 +190,12 @@ def ignore_unknown(self, node: yaml.Node) -> None: def _load_yaml_dict( - source: Union[TextIO, str], + source: TextIO | str, case_style: str = "hyphen", expand_envvars: bool = True, - dict_manipulator: Callable[[Dict[str, Any]], Dict[str, Any]] = lambda x: x, - keyvault_loader: Optional[KeyVaultLoader] = None, -) -> Dict[str, Any]: + dict_manipulator: Callable[[dict[str, Any]], dict[str, Any]] = lambda x: x, + keyvault_loader: KeyVaultLoader | None = None, +) -> dict[str, Any]: config_dict = _load_yaml_dict_raw(source, expand_envvars, keyvault_loader) config_dict = dict_manipulator(config_dict) @@ -210,12 +210,12 @@ def _load_yaml_dict( def _load_yaml( - source: Union[TextIO, str], + source: TextIO | str, config_type: Type[CustomConfigClass], case_style: str = "hyphen", expand_envvars: bool = True, - dict_manipulator: Callable[[Dict[str, Any]], Dict[str, Any]] = lambda x: x, - keyvault_loader: Optional[KeyVaultLoader] = None, + dict_manipulator: Callable[[dict[str, Any]], dict[str, Any]] = lambda x: x, + keyvault_loader: KeyVaultLoader | None = None, ) -> CustomConfigClass: config_dict = _load_yaml_dict( source, @@ -267,11 +267,11 @@ def all_types(type_: Type) -> Iterable[Type]: def load_yaml( - source: Union[TextIO, str], + source: TextIO | str, config_type: Type[CustomConfigClass], case_style: str = "hyphen", expand_envvars: bool = True, - keyvault_loader: Optional[KeyVaultLoader] = None, + keyvault_loader: KeyVaultLoader | None = None, ) -> CustomConfigClass: """ Read a YAML file, and create a config object based on its contents. @@ -300,11 +300,11 @@ def load_yaml( def load_yaml_dict( - source: Union[TextIO, str], + source: TextIO | str, case_style: str = "hyphen", expand_envvars: bool = True, - keyvault_loader: Optional[KeyVaultLoader] = None, -) -> Dict[str, Any]: + keyvault_loader: KeyVaultLoader | None = None, +) -> dict[str, Any]: """ Read a YAML file and return a dictionary from its contents @@ -326,9 +326,9 @@ def load_yaml_dict( ) -def compile_patterns(ignore_patterns: List[Union[str, IgnorePattern]]) -> list[re.Pattern[str]]: +def compile_patterns(ignore_patterns: list[str | IgnorePattern]) -> list[re.Pattern[str]]: """ - List of patterns to compile + list of patterns to compile Args: ignore_patterns: A list of strings or IgnorePattern to be compiled. @@ -350,17 +350,17 @@ def __init__(self, config_path: str, config_type: Type[CustomConfigClass]): self.config_path = config_path self.config_type = config_type - self._config: Optional[CustomConfigClass] = None - self._next_config: Optional[CustomConfigClass] = None + self._config: CustomConfigClass | None = None + self._next_config: CustomConfigClass | None = None - self._cognite_client: Optional[CogniteClient] = None + self._cognite_client: CogniteClient | None = None def _reload_file(self) -> None: with open(self.config_path, "r") as stream: self._config_text = stream.read() @property - def cognite_client(self) -> Optional[CogniteClient]: + def cognite_client(self) -> CogniteClient | None: if self._cognite_client is None and self._config is not None: self._cognite_client = self._config.cognite.get_cognite_client("config_resolver") return self._cognite_client @@ -412,7 +412,7 @@ def from_cli( return cls(args.config[0], config_type) - def _inject_cognite(self, local_part: _BaseConfig, remote_part: Dict[str, Any]) -> Dict[str, Any]: + def _inject_cognite(self, local_part: _BaseConfig, remote_part: dict[str, Any]) -> dict[str, Any]: # We can not dump 'local_part.cognite' directly because e.g. 'data_set' may be set remote only... remote_part.setdefault("cognite", {}) remote_part["cognite"]["idp_authentication"] = dataclasses.asdict(local_part.cognite.idp_authentication) diff --git a/cognite/extractorutils/configtools/validators.py b/cognite/extractorutils/configtools/validators.py index 3404456a..bec42ae9 100644 --- a/cognite/extractorutils/configtools/validators.py +++ b/cognite/extractorutils/configtools/validators.py @@ -1,11 +1,10 @@ import logging import re -from typing import Union _logger = logging.getLogger(__name__) -def matches_patterns(patterns: list[Union[str, re.Pattern[str]]], string: str) -> bool: +def matches_patterns(patterns: list[str | re.Pattern[str]], string: str) -> bool: """ Check string against list of RegExp patterns. @@ -19,7 +18,7 @@ def matches_patterns(patterns: list[Union[str, re.Pattern[str]]], string: str) - return any([matches_pattern(pattern, string) for pattern in patterns]) -def matches_pattern(pattern: Union[str, re.Pattern[str]], string: str) -> bool: +def matches_pattern(pattern: str | re.Pattern[str], string: str) -> bool: """ Match pattern against a string. diff --git a/cognite/extractorutils/exceptions.py b/cognite/extractorutils/exceptions.py index a0de9b56..783f4b31 100644 --- a/cognite/extractorutils/exceptions.py +++ b/cognite/extractorutils/exceptions.py @@ -13,9 +13,6 @@ # limitations under the License. -from typing import List, Optional - - class InvalidConfigError(Exception): """ Exception thrown from ``load_yaml`` and ``load_yaml_dict`` if config file is invalid. This can be due to @@ -25,7 +22,7 @@ class InvalidConfigError(Exception): * Unkown fields """ - def __init__(self, message: str, details: Optional[List[str]] = None): + def __init__(self, message: str, details: list[str] | None = None): super(InvalidConfigError, self).__init__() self.message = message self.details = details diff --git a/cognite/extractorutils/metrics.py b/cognite/extractorutils/metrics.py index 62362d19..75e199e3 100644 --- a/cognite/extractorutils/metrics.py +++ b/cognite/extractorutils/metrics.py @@ -43,7 +43,7 @@ def __init__(self): from abc import ABC, abstractmethod from time import sleep from types import TracebackType -from typing import Any, Callable, Dict, List, Optional, Tuple, Type, TypeVar, Union +from typing import Any, Callable, Type, TypeVar import arrow import psutil @@ -177,14 +177,14 @@ class AbstractMetricsPusher(ABC): def __init__( self, - push_interval: Optional[int] = None, - thread_name: Optional[str] = None, - cancellation_token: Optional[CancellationToken] = None, + push_interval: int | None = None, + thread_name: str | None = None, + cancellation_token: CancellationToken | None = None, ): self.push_interval = push_interval self.thread_name = thread_name - self.thread: Optional[threading.Thread] = None + self.thread: threading.Thread | None = None self.thread_name = thread_name self.cancellation_token = cancellation_token.create_child_token() if cancellation_token else CancellationToken() @@ -232,7 +232,7 @@ def __enter__(self) -> "AbstractMetricsPusher": return self def __exit__( - self, exc_type: Optional[Type[BaseException]], exc_val: Optional[BaseException], exc_tb: Optional[TracebackType] + self, exc_type: Type[BaseException] | None, exc_val: BaseException | None, exc_tb: TracebackType | None ) -> None: """ Wraps around stop method, for use as context manager @@ -264,10 +264,10 @@ def __init__( job_name: str, url: str, push_interval: int, - username: Optional[str] = None, - password: Optional[str] = None, - thread_name: Optional[str] = None, - cancellation_token: Optional[CancellationToken] = None, + username: str | None = None, + password: str | None = None, + thread_name: str | None = None, + cancellation_token: CancellationToken | None = None, ): super(PrometheusPusher, self).__init__(push_interval, thread_name, cancellation_token) @@ -277,7 +277,7 @@ def __init__( self.url = url - def _auth_handler(self, url: str, method: str, timeout: int, headers: List[Tuple[str, str]], data: Any) -> Callable: + def _auth_handler(self, url: str, method: str, timeout: int, headers: list[tuple[str, str]], data: Any) -> Callable: """ Returns a authentication handler against the Prometheus Pushgateway to use in the pushadd_to_gateway method. @@ -340,10 +340,10 @@ def __init__( cdf_client: CogniteClient, external_id_prefix: str, push_interval: int, - asset: Optional[Asset] = None, - data_set: Optional[EitherId] = None, - thread_name: Optional[str] = None, - cancellation_token: Optional[CancellationToken] = None, + asset: Asset | None = None, + data_set: EitherId | None = None, + thread_name: str | None = None, + cancellation_token: CancellationToken | None = None, ): super(CognitePusher, self).__init__(push_interval, thread_name, cancellation_token) @@ -360,11 +360,11 @@ def _init_cdf(self) -> None: """ Initialize the CDF tenant with the necessary time series and asset. """ - time_series: List[TimeSeries] = [] + time_series: list[TimeSeries] = [] if self.asset is not None: # Ensure that asset exist, and retrieve internal ID - asset: Optional[Asset] + asset: Asset | None try: asset = self.cdf_client.assets.create(self.asset) except CogniteDuplicatedError: @@ -406,7 +406,7 @@ def _push_to_server(self) -> None: """ timestamp = int(arrow.get().float_timestamp * 1000) - datapoints: List[Dict[str, Union[str, int, List[Any], Datapoints, DatapointsArray]]] = [] + datapoints: list[dict[str, str | int | list[Any] | Datapoints | DatapointsArray]] = [] for metric in REGISTRY.collect(): if type(metric) == Metric and metric.type in ["gauge", "counter"]: diff --git a/cognite/extractorutils/statestore/_base.py b/cognite/extractorutils/statestore/_base.py index dd1928e6..a30843df 100644 --- a/cognite/extractorutils/statestore/_base.py +++ b/cognite/extractorutils/statestore/_base.py @@ -1,7 +1,6 @@ import logging import threading from abc import ABC, abstractmethod -from typing import Optional from cognite.extractorutils._inner_util import _resolve_log_level from cognite.extractorutils.threading import CancellationToken @@ -15,10 +14,10 @@ class _BaseStateStore(ABC): def __init__( self, - save_interval: Optional[int] = None, + save_interval: int | None = None, trigger_log_level: str = "DEBUG", - thread_name: Optional[str] = None, - cancellation_token: Optional[CancellationToken] = None, + thread_name: str | None = None, + cancellation_token: CancellationToken | None = None, ) -> None: self._initialized = False diff --git a/cognite/extractorutils/statestore/hashing.py b/cognite/extractorutils/statestore/hashing.py index 26c21ab4..1edd402e 100644 --- a/cognite/extractorutils/statestore/hashing.py +++ b/cognite/extractorutils/statestore/hashing.py @@ -2,7 +2,7 @@ import json from abc import ABC from types import TracebackType -from typing import Any, Dict, Iterable, Iterator, Optional, Set, Type +from typing import Any, Iterable, Iterator, Type import orjson @@ -19,10 +19,10 @@ class AbstractHashStateStore(_BaseStateStore, ABC): def __init__( self, - save_interval: Optional[int] = None, + save_interval: int | None = None, trigger_log_level: str = "DEBUG", - thread_name: Optional[str] = None, - cancellation_token: Optional[CancellationToken] = None, + thread_name: str | None = None, + cancellation_token: CancellationToken | None = None, ) -> None: super().__init__( save_interval=save_interval, @@ -31,31 +31,31 @@ def __init__( cancellation_token=cancellation_token, ) - self._local_state: Dict[str, Dict[str, str]] = {} - self._seen: Set[str] = set() + self._local_state: dict[str, dict[str, str]] = {} + self._seen: set[str] = set() - def get_state(self, external_id: str) -> Optional[str]: + def get_state(self, external_id: str) -> str | None: with self.lock: return self._local_state.get(external_id, {}).get("digest") - def _hash_row(self, data: Dict[str, Any]) -> str: + def _hash_row(self, data: dict[str, Any]) -> str: return hashlib.sha256(orjson.dumps(data, option=orjson.OPT_SORT_KEYS)).hexdigest() - def set_state(self, external_id: str, data: Dict[str, Any]) -> None: + def set_state(self, external_id: str, data: dict[str, Any]) -> None: with self.lock: self._local_state[external_id] = {"digest": self._hash_row(data)} - def has_changed(self, external_id: str, data: Dict[str, Any]) -> bool: + def has_changed(self, external_id: str, data: dict[str, Any]) -> bool: with self.lock: if external_id not in self._local_state: return True return self._hash_row(data) != self._local_state[external_id]["digest"] - def __getitem__(self, external_id: str) -> Optional[str]: + def __getitem__(self, external_id: str) -> str | None: return self.get_state(external_id) - def __setitem__(self, key: str, value: Dict[str, Any]) -> None: + def __setitem__(self, key: str, value: dict[str, Any]) -> None: self.set_state(external_id=key, data=value) def __contains__(self, external_id: str) -> bool: @@ -76,10 +76,10 @@ def __init__( cdf_client: CogniteClient, database: str, table: str, - save_interval: Optional[int] = None, + save_interval: int | None = None, trigger_log_level: str = "DEBUG", - thread_name: Optional[str] = None, - cancellation_token: Optional[CancellationToken] = None, + thread_name: str | None = None, + cancellation_token: CancellationToken | None = None, ) -> None: super().__init__( save_interval=save_interval, @@ -169,9 +169,9 @@ def __enter__(self) -> "RawHashStateStore": def __exit__( self, - exc_type: Optional[Type[BaseException]], - exc_val: Optional[BaseException], - exc_tb: Optional[TracebackType], + exc_type: Type[BaseException] | None, + exc_val: BaseException | None, + exc_tb: TracebackType | None, ) -> None: """ Wraps around stop method, for use as context manager @@ -188,10 +188,10 @@ class LocalHashStateStore(AbstractHashStateStore): def __init__( self, file_path: str, - save_interval: Optional[int] = None, + save_interval: int | None = None, trigger_log_level: str = "DEBUG", - thread_name: Optional[str] = None, - cancellation_token: Optional[CancellationToken] = None, + thread_name: str | None = None, + cancellation_token: CancellationToken | None = None, ) -> None: super().__init__( save_interval=save_interval, @@ -243,9 +243,9 @@ def __enter__(self) -> "LocalHashStateStore": def __exit__( self, - exc_type: Optional[Type[BaseException]], - exc_val: Optional[BaseException], - exc_tb: Optional[TracebackType], + exc_type: Type[BaseException] | None, + exc_val: BaseException | None, + exc_tb: TracebackType | None, ) -> None: """ Wraps around stop method, for use as context manager diff --git a/cognite/extractorutils/statestore/watermark.py b/cognite/extractorutils/statestore/watermark.py index f29a20ea..31eed056 100644 --- a/cognite/extractorutils/statestore/watermark.py +++ b/cognite/extractorutils/statestore/watermark.py @@ -88,7 +88,7 @@ import json from abc import ABC from types import TracebackType -from typing import Any, Callable, Dict, Iterator, List, Optional, Tuple, Type, Union +from typing import Any, Callable, Dict, Iterator, List, Tuple, Type, Union from cognite.client import CogniteClient from cognite.client.exceptions import CogniteAPIError @@ -114,10 +114,10 @@ class AbstractStateStore(_BaseStateStore, ABC): def __init__( self, - save_interval: Optional[int] = None, + save_interval: int | None = None, trigger_log_level: str = "DEBUG", - thread_name: Optional[str] = None, - cancellation_token: Optional[CancellationToken] = None, + thread_name: str | None = None, + cancellation_token: CancellationToken | None = None, ): super().__init__( save_interval=save_interval, @@ -152,7 +152,7 @@ def get_state(self, external_id: Union[str, List[str]]) -> Union[Tuple[Any, Any] state = self._local_state.get(external_id, {}) return state.get("low"), state.get("high") - def set_state(self, external_id: str, low: Optional[Any] = None, high: Optional[Any] = None) -> None: + def set_state(self, external_id: str, low: Any | None = None, high: Any | None = None) -> None: """ Set/update state of a singe external ID. @@ -166,7 +166,7 @@ def set_state(self, external_id: str, low: Optional[Any] = None, high: Optional[ state["low"] = low if low is not None else state.get("low") state["high"] = high if high is not None else state.get("high") - def expand_state(self, external_id: str, low: Optional[Any] = None, high: Optional[Any] = None) -> None: + def expand_state(self, external_id: str, low: Any | None = None, high: Any | None = None) -> None: """ Like set_state, but only sets state if the proposed state is outside the stored state. That is if e.g. low is lower than the stored low. @@ -275,10 +275,10 @@ def __init__( cdf_client: CogniteClient, database: str, table: str, - save_interval: Optional[int] = None, + save_interval: int | None = None, trigger_log_level: str = "DEBUG", - thread_name: Optional[str] = None, - cancellation_token: Optional[CancellationToken] = None, + thread_name: str | None = None, + cancellation_token: CancellationToken | None = None, ): super().__init__(save_interval, trigger_log_level, thread_name, cancellation_token) @@ -380,7 +380,7 @@ def __enter__(self) -> "RawStateStore": return self def __exit__( - self, exc_type: Optional[Type[BaseException]], exc_val: Optional[BaseException], exc_tb: Optional[TracebackType] + self, exc_type: Type[BaseException] | None, exc_val: BaseException | None, exc_tb: TracebackType | None ) -> None: """ Wraps around stop method, for use as context manager @@ -409,10 +409,10 @@ class LocalStateStore(AbstractStateStore): def __init__( self, file_path: str, - save_interval: Optional[int] = None, + save_interval: int | None = None, trigger_log_level: str = "DEBUG", - thread_name: Optional[str] = None, - cancellation_token: Optional[CancellationToken] = None, + thread_name: str | None = None, + cancellation_token: CancellationToken | None = None, ): super().__init__(save_interval, trigger_log_level, thread_name, cancellation_token) @@ -459,7 +459,10 @@ def __enter__(self) -> "LocalStateStore": return self def __exit__( - self, exc_type: Optional[Type[BaseException]], exc_val: Optional[BaseException], exc_tb: Optional[TracebackType] + self, + exc_type: Type[BaseException] | None, + exc_val: BaseException | None, + exc_tb: TracebackType | None, ) -> None: """ Wraps around stop method, for use as context manager diff --git a/cognite/extractorutils/threading.py b/cognite/extractorutils/threading.py index 30c92fbf..bd4ddaed 100644 --- a/cognite/extractorutils/threading.py +++ b/cognite/extractorutils/threading.py @@ -2,7 +2,7 @@ import signal from threading import Condition from time import time -from typing import Any, Optional +from typing import Any class CancellationToken: @@ -14,10 +14,10 @@ class CancellationToken: cancelled if the parent is cancelled, but can be canceled alone without affecting the parent token. """ - def __init__(self, condition: Optional[Condition] = None) -> None: + def __init__(self, condition: Condition | None = None) -> None: self._cv: Condition = condition or Condition() self._is_cancelled_int: bool = False - self._parent: Optional["CancellationToken"] = None + self._parent: "CancellationToken" | None = None def __repr__(self) -> str: cls = self.__class__ @@ -59,7 +59,7 @@ def set(self) -> None: """ self.cancel() - def wait(self, timeout: Optional[float] = None) -> bool: + def wait(self, timeout: float | None = None) -> bool: endtime = None if timeout is not None: endtime = time() + timeout diff --git a/cognite/extractorutils/unstable/core/base.py b/cognite/extractorutils/unstable/core/base.py index 7f3ba12c..15ec136d 100644 --- a/cognite/extractorutils/unstable/core/base.py +++ b/cognite/extractorutils/unstable/core/base.py @@ -8,7 +8,7 @@ from threading import RLock, Thread from traceback import format_exception from types import TracebackType -from typing import Generic, Literal, Optional, Type, TypeVar, Union +from typing import Generic, Literal, Type, TypeVar from humps import pascalize from typing_extensions import Self, assert_never @@ -33,7 +33,7 @@ __all__ = ["ConfigType", "ConfigRevision", "Extractor"] ConfigType = TypeVar("ConfigType", bound=ExtractorConfig) -ConfigRevision = Union[Literal["local"], int] +ConfigRevision = Literal["local"] | int _T = TypeVar("_T", bound=ExtractorConfig) @@ -75,7 +75,7 @@ def __init__(self, config: FullConfig[ConfigType]) -> None: self.cognite_client = self.connection_config.get_cognite_client(f"{self.EXTERNAL_ID}-{self.VERSION}") self._checkin_lock = RLock() - self._runtime_messages: Optional[Queue[RuntimeMessage]] = None + self._runtime_messages: Queue[RuntimeMessage] | None = None self._scheduler = TaskScheduler(self.cancellation_token.create_child_token()) @@ -305,9 +305,9 @@ def __enter__(self) -> Self: def __exit__( self, - exc_type: Optional[Type[BaseException]], - exc_val: Optional[BaseException], - exc_tb: Optional[TracebackType], + exc_type: Type[BaseException] | None, + exc_val: BaseException | None, + exc_tb: TracebackType | None, ) -> bool: self.stop() with self._checkin_lock: diff --git a/cognite/extractorutils/uploader/_base.py b/cognite/extractorutils/uploader/_base.py index b5fb74fe..cbff8f81 100644 --- a/cognite/extractorutils/uploader/_base.py +++ b/cognite/extractorutils/uploader/_base.py @@ -16,7 +16,7 @@ import threading from abc import ABC, abstractmethod from dataclasses import dataclass -from typing import Any, Callable, List, Optional +from typing import Any, Callable from arrow import Arrow @@ -43,12 +43,12 @@ class AbstractUploadQueue(ABC): def __init__( self, cdf_client: CogniteClient, - post_upload_function: Optional[Callable[[List[Any]], None]] = None, - max_queue_size: Optional[int] = None, - max_upload_interval: Optional[int] = None, + post_upload_function: Callable[[list[Any]], None] | None = None, + max_queue_size: int | None = None, + max_upload_interval: int | None = None, trigger_log_level: str = "DEBUG", - thread_name: Optional[str] = None, - cancellation_token: Optional[CancellationToken] = None, + thread_name: str | None = None, + cancellation_token: CancellationToken | None = None, ): self.cdf_client = cdf_client @@ -81,12 +81,12 @@ def _check_triggers(self) -> None: return None - def _post_upload(self, uploaded: List[Any]) -> None: + def _post_upload(self, uploaded: list[Any]) -> None: """ Perform post_upload_function to uploaded data, if applicable Args: - uploaded: List of uploaded data + uploaded: list of uploaded data """ if self.post_upload_function is not None: try: diff --git a/cognite/extractorutils/uploader/assets.py b/cognite/extractorutils/uploader/assets.py index ebc5576e..fc80bd16 100644 --- a/cognite/extractorutils/uploader/assets.py +++ b/cognite/extractorutils/uploader/assets.py @@ -12,7 +12,8 @@ # See the License for the specific language governing permissions and # limitations under the License. -from typing import Any, Callable, List, Optional, Type +from types import TracebackType +from typing import Any, Callable, Type from cognite.client import CogniteClient from cognite.client.data_classes.assets import Asset @@ -52,12 +53,12 @@ class AssetUploadQueue(AbstractUploadQueue): def __init__( self, cdf_client: CogniteClient, - post_upload_function: Optional[Callable[[List[Any]], None]] = None, - max_queue_size: Optional[int] = None, - max_upload_interval: Optional[int] = None, + post_upload_function: Callable[[list[Any]], None] | None = None, + max_queue_size: int | None = None, + max_upload_interval: int | None = None, trigger_log_level: str = "DEBUG", - thread_name: Optional[str] = None, - cancellation_token: Optional[CancellationToken] = None, + thread_name: str | None = None, + cancellation_token: CancellationToken | None = None, ): super().__init__( cdf_client, @@ -68,7 +69,7 @@ def __init__( thread_name, cancellation_token, ) - self.upload_queue: List[Asset] = [] + self.upload_queue: list[Asset] = [] self.assets_queued = ASSETS_UPLOADER_QUEUED self.assets_written = ASSETS_UPLOADER_WRITTEN self.queue_size = ASSETS_UPLOADER_QUEUE_SIZE @@ -106,7 +107,7 @@ def _upload_batch() -> None: self.cdf_client.assets.create(self.upload_queue) except CogniteDuplicatedError as e: duplicated_ids = set([dup["externalId"] for dup in e.duplicated if "externalId" in dup]) - failed: List[Asset] = [e for e in e.failed] + failed: list[Asset] = [e for e in e.failed] to_create = [] to_update = [] for asset in failed: @@ -144,7 +145,12 @@ def __enter__(self) -> "AssetUploadQueue": self.start() return self - def __exit__(self, exc_type: Optional[Type[BaseException]], exc_val: Optional[BaseException]) -> None: + def __exit__( + self, + exc_type: Type[BaseException] | None, + exc_val: BaseException | None, + exc_tb: TracebackType | None, + ) -> None: """ Wraps around stop method, for use as context manager diff --git a/cognite/extractorutils/uploader/data_modeling.py b/cognite/extractorutils/uploader/data_modeling.py index f000cd18..dbdacc28 100644 --- a/cognite/extractorutils/uploader/data_modeling.py +++ b/cognite/extractorutils/uploader/data_modeling.py @@ -1,5 +1,5 @@ from types import TracebackType -from typing import Any, Callable, List, Optional, Type +from typing import Any, Callable, Type from cognite.client import CogniteClient from cognite.client.data_classes.data_modeling import EdgeApply, NodeApply @@ -18,12 +18,12 @@ class InstanceUploadQueue(AbstractUploadQueue): def __init__( self, cdf_client: CogniteClient, - post_upload_function: Optional[Callable[[List[Any]], None]] = None, - max_queue_size: Optional[int] = None, - max_upload_interval: Optional[int] = None, + post_upload_function: Callable[[list[Any]], None] | None = None, + max_queue_size: int | None = None, + max_upload_interval: int | None = None, trigger_log_level: str = "DEBUG", - thread_name: Optional[str] = None, - cancellation_token: Optional[CancellationToken] = None, + thread_name: str | None = None, + cancellation_token: CancellationToken | None = None, auto_create_start_nodes: bool = True, auto_create_end_nodes: bool = True, auto_create_direct_relations: bool = True, @@ -42,14 +42,14 @@ def __init__( self.auto_create_end_nodes = auto_create_end_nodes self.auto_create_direct_relations = auto_create_direct_relations - self.node_queue: List[NodeApply] = [] - self.edge_queue: List[EdgeApply] = [] + self.node_queue: list[NodeApply] = [] + self.edge_queue: list[EdgeApply] = [] def add_to_upload_queue( self, *, - node_data: Optional[List[NodeApply]] = None, - edge_data: Optional[List[EdgeApply]] = None, + node_data: list[NodeApply] | None = None, + edge_data: list[EdgeApply] | None = None, ) -> None: if node_data: with self.lock: @@ -100,9 +100,9 @@ def __enter__(self) -> "InstanceUploadQueue": def __exit__( self, - exc_type: Optional[Type[BaseException]], - exc_val: Optional[BaseException], - exc_tb: Optional[TracebackType], + exc_type: Type[BaseException] | None, + exc_val: BaseException | None, + exc_tb: TracebackType | None, ) -> None: """ Wraps around stop method, for use as context manager diff --git a/cognite/extractorutils/uploader/events.py b/cognite/extractorutils/uploader/events.py index 7edb5cd5..2a922bed 100644 --- a/cognite/extractorutils/uploader/events.py +++ b/cognite/extractorutils/uploader/events.py @@ -13,7 +13,7 @@ # limitations under the License. from types import TracebackType -from typing import Callable, List, Optional, Type +from typing import Callable, Type from cognite.client import CogniteClient from cognite.client.data_classes import Event @@ -52,12 +52,12 @@ class EventUploadQueue(AbstractUploadQueue): def __init__( self, cdf_client: CogniteClient, - post_upload_function: Optional[Callable[[List[Event]], None]] = None, - max_queue_size: Optional[int] = None, - max_upload_interval: Optional[int] = None, + post_upload_function: Callable[[list[Event]], None] | None = None, + max_queue_size: int | None = None, + max_upload_interval: int | None = None, trigger_log_level: str = "DEBUG", - thread_name: Optional[str] = None, - cancellation_token: Optional[CancellationToken] = None, + thread_name: str | None = None, + cancellation_token: CancellationToken | None = None, ): # Super sets post_upload and threshold super().__init__( @@ -70,7 +70,7 @@ def __init__( cancellation_token, ) - self.upload_queue: List[Event] = [] + self.upload_queue: list[Event] = [] self.events_queued = EVENTS_UPLOADER_QUEUED self.events_written = EVENTS_UPLOADER_WRITTEN @@ -110,7 +110,7 @@ def _upload_batch() -> None: self.cdf_client.events.create([e for e in self.upload_queue]) except CogniteDuplicatedError as e: duplicated_ids = set([dup["externalId"] for dup in e.duplicated if "externalId" in dup]) - failed: List[Event] = [e for e in e.failed] + failed: list[Event] = [e for e in e.failed] to_create = [] to_update = [] for evt in failed: @@ -151,7 +151,7 @@ def __enter__(self) -> "EventUploadQueue": return self def __exit__( - self, exc_type: Optional[Type[BaseException]], exc_val: Optional[BaseException], exc_tb: Optional[TracebackType] + self, exc_type: Type[BaseException] | None, exc_val: BaseException | None, exc_tb: TracebackType | None ) -> None: """ Wraps around stop method, for use as context manager diff --git a/cognite/extractorutils/uploader/files.py b/cognite/extractorutils/uploader/files.py index 8da265fe..116945af 100644 --- a/cognite/extractorutils/uploader/files.py +++ b/cognite/extractorutils/uploader/files.py @@ -22,13 +22,8 @@ Any, BinaryIO, Callable, - Dict, Iterator, - List, - Optional, - Tuple, Type, - Union, ) from urllib.parse import ParseResult, urlparse @@ -67,7 +62,7 @@ _CDF_ALPHA_VERSION_HEADER = {"cdf-version": "alpha"} -FileMetadataOrCogniteExtractorFile = Union[FileMetadata, CogniteExtractorFileApply] +FileMetadataOrCogniteExtractorFile = FileMetadata | CogniteExtractorFileApply class ChunkedStream(RawIOBase, BinaryIO): @@ -111,9 +106,9 @@ def __enter__(self) -> "ChunkedStream": def __exit__( self, - exc_type: Optional[Type[BaseException]], - exc_val: Optional[BaseException], - exc_tb: Optional[TracebackType], + exc_type: Type[BaseException] | None, + exc_val: BaseException | None, + exc_tb: TracebackType | None, ) -> None: return super().__exit__(exc_type, exc_val, exc_tb) @@ -202,13 +197,13 @@ class IOFileUploadQueue(AbstractUploadQueue): def __init__( self, cdf_client: CogniteClient, - post_upload_function: Optional[Callable[[List[FileMetadataOrCogniteExtractorFile]], None]] = None, - max_queue_size: Optional[int] = None, + post_upload_function: Callable[[list[FileMetadataOrCogniteExtractorFile]], None] | None = None, + max_queue_size: int | None = None, trigger_log_level: str = "DEBUG", - thread_name: Optional[str] = None, + thread_name: str | None = None, overwrite_existing: bool = False, - cancellation_token: Optional[CancellationToken] = None, - max_parallelism: Optional[int] = None, + cancellation_token: CancellationToken | None = None, + max_parallelism: int | None = None, ): # Super sets post_upload and threshold super().__init__( @@ -224,8 +219,8 @@ def __init__( if self.threshold <= 0: raise ValueError("Max queue size must be positive for file upload queues") - self.upload_queue: List[Future] = [] - self.errors: List[Exception] = [] + self.upload_queue: list[Future] = [] + self.errors: list[Exception] = [] self.overwrite_existing = overwrite_existing @@ -398,12 +393,7 @@ def add_io_to_upload_queue( self, file_meta: FileMetadataOrCogniteExtractorFile, read_file: Callable[[], BinaryIO], - extra_retries: Optional[ - Union[ - Tuple[Type[Exception], ...], - Dict[Type[Exception], Callable[[Any], bool]], - ] - ] = None, + extra_retries: tuple[Type[Exception], ...] | dict[Type[Exception], Callable[[Any], bool]] | None = None, ) -> None: """ Add file to upload queue. The file will start uploading immedeately. If the size of the queue is larger than @@ -484,7 +474,7 @@ def wrapped_upload( self.queue_size.set(self.upload_queue_size) def _get_file_upload_request( - self, url_str: str, stream: BinaryIO, size: int, mime_type: Optional[str] = None + self, url_str: str, stream: BinaryIO, size: int, mime_type: str | None = None ) -> Request: url = URL(url_str) base_url = URL(self.cdf_client.config.base_url) @@ -528,7 +518,7 @@ def _create_cdm(self, instance_id: NodeId) -> tuple[FileMetadata, str]: resp_json = res.json()["items"][0] return FileMetadata.load(resp_json), resp_json["uploadUrl"] - def upload(self, fail_on_errors: bool = True, timeout: Optional[float] = None) -> None: + def upload(self, fail_on_errors: bool = True, timeout: float | None = None) -> None: """ Wait for all uploads to finish """ @@ -554,9 +544,9 @@ def __enter__(self) -> "IOFileUploadQueue": def __exit__( self, - exc_type: Optional[Type[BaseException]], - exc_val: Optional[BaseException], - exc_tb: Optional[TracebackType], + exc_type: Type[BaseException] | None, + exc_val: BaseException | None, + exc_tb: TracebackType | None, ) -> None: """ Wraps around stop method, for use as context manager @@ -595,13 +585,13 @@ class FileUploadQueue(IOFileUploadQueue): def __init__( self, cdf_client: CogniteClient, - post_upload_function: Optional[Callable[[List[FileMetadataOrCogniteExtractorFile]], None]] = None, - max_queue_size: Optional[int] = None, - max_upload_interval: Optional[int] = None, + post_upload_function: Callable[[list[FileMetadataOrCogniteExtractorFile]], None] | None = None, + max_queue_size: int | None = None, + max_upload_interval: int | None = None, trigger_log_level: str = "DEBUG", - thread_name: Optional[str] = None, + thread_name: str | None = None, overwrite_existing: bool = False, - cancellation_token: Optional[CancellationToken] = None, + cancellation_token: CancellationToken | None = None, ): # Super sets post_upload and threshold super().__init__( @@ -617,7 +607,7 @@ def __init__( def add_to_upload_queue( self, file_meta: FileMetadataOrCogniteExtractorFile, - file_name: Union[str, PathLike], + file_name: str | PathLike, ) -> None: """ Add file to upload queue. The queue will be uploaded if the queue size is larger than the threshold @@ -652,12 +642,12 @@ class BytesUploadQueue(IOFileUploadQueue): def __init__( self, cdf_client: CogniteClient, - post_upload_function: Optional[Callable[[List[FileMetadataOrCogniteExtractorFile]], None]] = None, - max_queue_size: Optional[int] = None, + post_upload_function: Callable[[list[FileMetadataOrCogniteExtractorFile]], None] | None = None, + max_queue_size: int | None = None, trigger_log_level: str = "DEBUG", - thread_name: Optional[str] = None, + thread_name: str | None = None, overwrite_existing: bool = False, - cancellation_token: Optional[CancellationToken] = None, + cancellation_token: CancellationToken | None = None, ) -> None: super().__init__( cdf_client, diff --git a/cognite/extractorutils/uploader/raw.py b/cognite/extractorutils/uploader/raw.py index de0719e9..1299fac4 100644 --- a/cognite/extractorutils/uploader/raw.py +++ b/cognite/extractorutils/uploader/raw.py @@ -13,7 +13,7 @@ # limitations under the License. from types import TracebackType -from typing import Any, Callable, Dict, List, Optional, Type +from typing import Any, Callable, Type import arrow from arrow import Arrow @@ -56,12 +56,12 @@ class RawUploadQueue(AbstractUploadQueue): def __init__( self, cdf_client: CogniteClient, - post_upload_function: Optional[Callable[[List[Any]], None]] = None, - max_queue_size: Optional[int] = None, - max_upload_interval: Optional[int] = None, + post_upload_function: Callable[[list[Any]], None] | None = None, + max_queue_size: int | None = None, + max_upload_interval: int | None = None, trigger_log_level: str = "DEBUG", - thread_name: Optional[str] = None, - cancellation_token: Optional[CancellationToken] = None, + thread_name: str | None = None, + cancellation_token: CancellationToken | None = None, ): # Super sets post_upload and thresholds super().__init__( @@ -73,7 +73,7 @@ def __init__( thread_name, cancellation_token, ) - self.upload_queue: Dict[str, Dict[str, List[TimestampedObject]]] = {} + self.upload_queue: dict[str, dict[str, list[TimestampedObject]]] = {} # It is a hack since Prometheus client registers metrics on object creation, so object has to be created once self.rows_queued = RAW_UPLOADER_ROWS_QUEUED @@ -119,7 +119,7 @@ def upload(self) -> None: max_delay=RETRY_MAX_DELAY, backoff=RETRY_BACKOFF_FACTOR, ) - def _upload_batch(database: str, table: str, patch: List[Row]) -> None: + def _upload_batch(database: str, table: str, patch: list[Row]) -> None: # Upload self.cdf_client.raw.rows.insert(db_name=database, table_name=table, row=patch, ensure_parent=True) @@ -133,7 +133,7 @@ def _upload_batch(database: str, table: str, patch: List[Row]) -> None: # Deduplicate # In case of duplicate keys, the first key is preserved, and the last value is preserved. - patch: Dict[str, Row] = {r.payload.key: r.payload for r in rows} + patch: dict[str, Row] = {r.payload.key: r.payload for r in rows} self.rows_duplicates.labels(_labels).inc(len(rows) - len(patch)) _upload_batch(database=database, table=table, patch=list(patch.values())) @@ -162,7 +162,7 @@ def __enter__(self) -> "RawUploadQueue": return self def __exit__( - self, exc_type: Optional[Type[BaseException]], exc_val: Optional[BaseException], exc_tb: Optional[TracebackType] + self, exc_type: Type[BaseException] | None, exc_val: BaseException | None, exc_tb: TracebackType | None ) -> None: """ Wraps around stop method, for use as context manager diff --git a/cognite/extractorutils/uploader/time_series.py b/cognite/extractorutils/uploader/time_series.py index c67340e4..8eb69c62 100644 --- a/cognite/extractorutils/uploader/time_series.py +++ b/cognite/extractorutils/uploader/time_series.py @@ -15,7 +15,7 @@ import math from datetime import datetime from types import TracebackType -from typing import Any, Callable, Dict, List, Optional, Tuple, Type, Union +from typing import Any, Callable, Type from cognite.client import CogniteClient from cognite.client.data_classes import ( @@ -50,13 +50,13 @@ MAX_DATAPOINT_VALUE = 1e100 MIN_DATAPOINT_VALUE = -1e100 -TimeStamp = Union[int, datetime] +TimeStamp = int | datetime -DataPointWithoutStatus = Union[Tuple[TimeStamp, float], Tuple[TimeStamp, str], Tuple[TimeStamp, int]] -FullStatusCode = Union[StatusCode, int] -DataPointWithStatus = Union[Tuple[TimeStamp, float, FullStatusCode], Tuple[TimeStamp, str, FullStatusCode]] -DataPoint = Union[DataPointWithoutStatus, DataPointWithStatus] -DataPointList = List[DataPoint] +DataPointWithoutStatus = tuple[TimeStamp, float] | tuple[TimeStamp, str] | tuple[TimeStamp, int] +FullStatusCode = StatusCode | int +DataPointWithStatus = tuple[TimeStamp, float, FullStatusCode] | tuple[TimeStamp, str, FullStatusCode] +DataPoint = DataPointWithoutStatus | DataPointWithStatus +DataPointList = list[DataPoint] def default_time_series_factory(external_id: str, datapoints: DataPointList) -> TimeSeries: @@ -103,14 +103,14 @@ class TimeSeriesUploadQueue(AbstractUploadQueue): def __init__( self, cdf_client: CogniteClient, - post_upload_function: Optional[Callable[[List[Dict[str, Union[str, DataPointList]]]], None]] = None, - max_queue_size: Optional[int] = None, - max_upload_interval: Optional[int] = None, + post_upload_function: Callable[[list[dict[str, str | DataPointList]]], None] | None = None, + max_queue_size: int | None = None, + max_upload_interval: int | None = None, trigger_log_level: str = "DEBUG", - thread_name: Optional[str] = None, - create_missing: Union[Callable[[str, DataPointList], TimeSeries], bool] = False, - data_set_id: Optional[int] = None, - cancellation_token: Optional[CancellationToken] = None, + thread_name: str | None = None, + create_missing: Callable[[str, DataPointList], TimeSeries] | bool = False, + data_set_id: int | None = None, + cancellation_token: CancellationToken | None = None, ): # Super sets post_upload and threshold super().__init__( @@ -132,14 +132,14 @@ def __init__( self.create_missing = True self.missing_factory = create_missing - self.upload_queue: Dict[EitherId, DataPointList] = {} + self.upload_queue: dict[EitherId, DataPointList] = {} self.points_queued = TIMESERIES_UPLOADER_POINTS_QUEUED self.points_written = TIMESERIES_UPLOADER_POINTS_WRITTEN self.queue_size = TIMESERIES_UPLOADER_QUEUE_SIZE self.data_set_id = data_set_id - def _verify_datapoint_time(self, time: Union[int, float, datetime, str]) -> bool: + def _verify_datapoint_time(self, time: int | float | datetime | str) -> bool: if isinstance(time, int) or isinstance(time, float): return not math.isnan(time) and time >= MIN_DATAPOINT_TIMESTAMP elif isinstance(time, str): @@ -147,7 +147,7 @@ def _verify_datapoint_time(self, time: Union[int, float, datetime, str]) -> bool else: return time.timestamp() * 1000.0 >= MIN_DATAPOINT_TIMESTAMP - def _verify_datapoint_value(self, value: Union[int, float, datetime, str]) -> bool: + def _verify_datapoint_value(self, value: int | float | datetime | str) -> bool: if isinstance(value, float): return not ( math.isnan(value) or math.isinf(value) or value > MAX_DATAPOINT_VALUE or value < MIN_DATAPOINT_VALUE @@ -171,7 +171,7 @@ def _is_datapoint_valid( return True def add_to_upload_queue( - self, *, id: Optional[int] = None, external_id: Optional[str] = None, datapoints: Optional[DataPointList] = None + self, *, id: int | None = None, external_id: str | None = None, datapoints: DataPointList | None = None ) -> None: """ Add data points to upload queue. The queue will be uploaded if the queue size is larger than the threshold @@ -180,7 +180,7 @@ def add_to_upload_queue( Args: id: Internal ID of time series. Either this or external_id must be set. external_id: External ID of time series. Either this or external_id must be set. - datapoints: List of data points to add + datapoints: list of data points to add """ datapoints = datapoints or [] old_len = len(datapoints) @@ -219,7 +219,7 @@ def upload(self) -> None: max_delay=RETRY_MAX_DELAY, backoff=RETRY_BACKOFF_FACTOR, ) - def _upload_batch(upload_this: List[Dict], retries: int = 5) -> List[Dict]: + def _upload_batch(upload_this: list[dict], retries: int = 5) -> list[dict]: if len(upload_this) == 0: return upload_this @@ -241,14 +241,14 @@ def _upload_batch(upload_this: List[Dict], retries: int = 5) -> List[Dict]: create_these_ids = set( [id_dict["externalId"] for id_dict in ex.not_found if "externalId" in id_dict] ) - datapoints_lists: Dict[str, DataPointList] = { + datapoints_lists: dict[str, DataPointList] = { ts_dict["externalId"]: ts_dict["datapoints"] for ts_dict in upload_this if ts_dict["externalId"] in create_these_ids } self.logger.info(f"Creating {len(create_these_ids)} time series") - to_create: List[TimeSeries] = [ + to_create: list[TimeSeries] = [ self.missing_factory(external_id, datapoints_lists[external_id]) for external_id in create_these_ids ] @@ -317,7 +317,7 @@ def __enter__(self) -> "TimeSeriesUploadQueue": return self def __exit__( - self, exc_type: Optional[Type[BaseException]], exc_val: Optional[BaseException], exc_tb: Optional[TracebackType] + self, exc_type: Type[BaseException] | None, exc_val: BaseException | None, exc_tb: TracebackType | None ) -> None: """ Wraps around stop method, for use as context manager @@ -343,13 +343,13 @@ class SequenceUploadQueue(AbstractUploadQueue): def __init__( self, cdf_client: CogniteClient, - post_upload_function: Optional[Callable[[List[Any]], None]] = None, - max_queue_size: Optional[int] = None, - max_upload_interval: Optional[int] = None, + post_upload_function: Callable[[list[Any]], None] | None = None, + max_queue_size: int | None = None, + max_upload_interval: int | None = None, trigger_log_level: str = "DEBUG", - thread_name: Optional[str] = None, + thread_name: str | None = None, create_missing: bool = False, - cancellation_token: Optional[CancellationToken] = None, + cancellation_token: CancellationToken | None = None, ): """ Args: @@ -374,15 +374,15 @@ def __init__( thread_name, cancellation_token, ) - self.upload_queue: Dict[EitherId, SequenceRows] = {} - self.sequence_metadata: Dict[EitherId, Dict[str, Union[str, int, float]]] = {} - self.sequence_asset_external_ids: Dict[EitherId, str] = {} - self.sequence_dataset_external_ids: Dict[EitherId, str] = {} - self.sequence_names: Dict[EitherId, str] = {} - self.sequence_descriptions: Dict[EitherId, str] = {} - self.column_definitions: Dict[EitherId, List[Dict[str, str]]] = {} - self.asset_ids: Dict[str, int] = {} - self.dataset_ids: Dict[str, int] = {} + self.upload_queue: dict[EitherId, SequenceRows] = {} + self.sequence_metadata: dict[EitherId, dict[str, str | int | float]] = {} + self.sequence_asset_external_ids: dict[EitherId, str] = {} + self.sequence_dataset_external_ids: dict[EitherId, str] = {} + self.sequence_names: dict[EitherId, str] = {} + self.sequence_descriptions: dict[EitherId, str] = {} + self.column_definitions: dict[EitherId, list[dict[str, str]]] = {} + self.asset_ids: dict[str, int] = {} + self.dataset_ids: dict[str, int] = {} self.create_missing = create_missing self.points_queued = SEQUENCES_UPLOADER_POINTS_QUEUED @@ -391,13 +391,13 @@ def __init__( def set_sequence_metadata( self, - metadata: Dict[str, Union[str, int, float]], - id: Optional[int] = None, - external_id: Optional[str] = None, - asset_external_id: Optional[str] = None, - dataset_external_id: Optional[str] = None, - name: Optional[str] = None, - description: Optional[str] = None, + metadata: dict[str, str | int | float], + id: int | None = None, + external_id: str | None = None, + asset_external_id: str | None = None, + dataset_external_id: str | None = None, + name: str | None = None, + description: str | None = None, ) -> None: """ Set sequence metadata. Metadata will be cached until the sequence is created. The metadata will be updated @@ -426,7 +426,7 @@ def set_sequence_metadata( self.sequence_descriptions[either_id] = description def set_sequence_column_definition( - self, col_def: List[Dict[str, str]], id: Optional[int] = None, external_id: Optional[str] = None + self, col_def: list[dict[str, str]], id: int | None = None, external_id: str | None = None ) -> None: """ Set sequence column definition @@ -443,16 +443,14 @@ def set_sequence_column_definition( def add_to_upload_queue( self, - rows: Union[ - Dict[int, List[Union[int, float, str]]], - List[Tuple[int, Union[int, float, str]]], - List[Dict[str, Any]], - SequenceData, - SequenceRows, - ], - column_external_ids: Optional[List[dict]] = None, - id: Optional[int] = None, - external_id: Optional[str] = None, + rows: dict[int, list[int | float | str]] + | list[tuple[int, int | float | str]] + | list[dict[str, Any]] + | SequenceData + | SequenceRows, + column_external_ids: list[dict] | None = None, + id: int | None = None, + external_id: str | None = None, ) -> None: """ Add sequence rows to upload queue. Mirrors implementation of SequenceApi.insert. Inserted rows will be @@ -461,7 +459,7 @@ def add_to_upload_queue( Args: rows: The rows to be inserted. Can either be a list of tuples, a list of ["rownumber": ..., "values": ...] objects, a dictionary of rowNumber: data, or a SequenceData object. - column_external_ids: List of external id for the columns of the sequence + column_external_ids: list of external id for the columns of the sequence id: Sequence internal ID Use if external_id is None external_id: Sequence external ID @@ -477,7 +475,7 @@ def add_to_upload_queue( # Already in the desired format pass elif isinstance(rows, (dict, list)): - rows_raw: List[Dict[str, Any]] + rows_raw: list[dict[str, Any]] if isinstance(rows, dict): rows_raw = [{"rowNumber": row_number, "values": values} for row_number, values in rows.items()] elif isinstance(rows, list) and rows and isinstance(rows[0], (tuple, list)): @@ -658,7 +656,7 @@ def __enter__(self) -> "SequenceUploadQueue": return self def __exit__( - self, exc_type: Optional[Type[BaseException]], exc_val: Optional[BaseException], exc_tb: Optional[TracebackType] + self, exc_type: Type[BaseException] | None, exc_val: BaseException | None, exc_tb: TracebackType | None ) -> None: """ Wraps around stop method, for use as context manager diff --git a/cognite/extractorutils/uploader_extractor.py b/cognite/extractorutils/uploader_extractor.py index 71aa1302..72c843be 100644 --- a/cognite/extractorutils/uploader_extractor.py +++ b/cognite/extractorutils/uploader_extractor.py @@ -15,9 +15,10 @@ """ A module containing a slightly more advanced base extractor class, sorting a generic output into upload queues. """ + from dataclasses import dataclass from types import TracebackType -from typing import Any, Callable, Iterable, List, Optional, Type, TypeVar +from typing import Any, Callable, Iterable, Type, TypeVar from more_itertools import peekable @@ -41,10 +42,11 @@ class QueueConfigClass: @dataclass class UploaderExtractorConfig(BaseConfig): - queues: Optional[QueueConfigClass] + queues: QueueConfigClass | None UploaderExtractorConfigClass = TypeVar("UploaderExtractorConfigClass", bound=UploaderExtractorConfig) +RunHandle = Callable[[CogniteClient, AbstractStateStore, UploaderExtractorConfigClass, CancellationToken], None] class UploaderExtractor(Extractor[UploaderExtractorConfigClass]): @@ -76,19 +78,17 @@ def __init__( *, name: str, description: str, - version: Optional[str] = None, - run_handle: Optional[ - Callable[[CogniteClient, AbstractStateStore, UploaderExtractorConfigClass, CancellationToken], None] - ] = None, + version: str | None = None, + run_handle: RunHandle | None = None, config_class: Type[UploaderExtractorConfigClass], - metrics: Optional[BaseMetrics] = None, + metrics: BaseMetrics | None = None, use_default_state_store: bool = True, - cancellation_token: Optional[CancellationToken] = None, - config_file_path: Optional[str] = None, + cancellation_token: CancellationToken | None = None, + config_file_path: str | None = None, continuous_extractor: bool = False, heartbeat_waiting_time: int = 600, handle_interrupts: bool = True, - middleware: Optional[List[Callable[[dict], dict]]] = None, + middleware: list[Callable[[dict], dict]] | None = None, ): super(UploaderExtractor, self).__init__( name=name, @@ -170,7 +170,7 @@ def __enter__(self) -> "UploaderExtractor": return self def __exit__( - self, exc_type: Optional[Type[BaseException]], exc_val: Optional[BaseException], exc_tb: Optional[TracebackType] + self, exc_type: Type[BaseException] | None, exc_val: BaseException | None, exc_tb: TracebackType | None ) -> bool: self.event_queue.__exit__(exc_type, exc_val, exc_tb) self.raw_queue.__exit__(exc_type, exc_val, exc_tb) diff --git a/cognite/extractorutils/uploader_types.py b/cognite/extractorutils/uploader_types.py index 47e9bc37..5e595524 100644 --- a/cognite/extractorutils/uploader_types.py +++ b/cognite/extractorutils/uploader_types.py @@ -1,27 +1,19 @@ -import sys -from typing import Iterable, List, Optional, Union +from typing import Iterable, TypeAlias from cognite.client.data_classes import Event as _Event from cognite.client.data_classes import Row as _Row - -if sys.version_info >= (3, 10): - from typing import TypeAlias -else: - from typing_extensions import TypeAlias - - from cognite.extractorutils.uploader.time_series import DataPoint class InsertDatapoints: - def __init__(self, *, id: Optional[int] = None, external_id: Optional[str] = None, datapoints: List[DataPoint]): + def __init__(self, *, id: int | None = None, external_id: str | None = None, datapoints: list[DataPoint]): self.id = id self.external_id = external_id self.datapoints = datapoints class RawRow: - def __init__(self, db_name: str, table_name: str, row: Union[_Row, Iterable[_Row]]): + def __init__(self, db_name: str, table_name: str, row: _Row | Iterable[_Row]): self.db_name = db_name self.table_name = table_name if isinstance(row, Iterable): @@ -32,4 +24,4 @@ def __init__(self, db_name: str, table_name: str, row: Union[_Row, Iterable[_Row Event: TypeAlias = _Event -CdfTypes = Union[Event, Iterable[Event], RawRow, Iterable[RawRow], InsertDatapoints, Iterable[InsertDatapoints]] +CdfTypes = Event | Iterable[Event] | RawRow | Iterable[RawRow] | InsertDatapoints | Iterable[InsertDatapoints] diff --git a/cognite/extractorutils/util.py b/cognite/extractorutils/util.py index 08fec7a2..e454ad56 100644 --- a/cognite/extractorutils/util.py +++ b/cognite/extractorutils/util.py @@ -25,7 +25,7 @@ from io import RawIOBase from threading import Thread from time import time -from typing import Any, Callable, Dict, Generator, Iterable, List, Optional, Tuple, Type, TypeVar, Union +from typing import Any, Callable, Generator, Iterable, Type, TypeVar from decorator import decorator @@ -89,7 +89,7 @@ class EitherId: TypeError: If none of both of id types are set. """ - def __init__(self, **kwargs: Union[int, str, None]): + def __init__(self, **kwargs: int | str | None): internal_id = kwargs.get("id") external_id = kwargs.get("externalId") or kwargs.get("external_id") @@ -105,8 +105,8 @@ def __init__(self, **kwargs: Union[int, str, None]): if external_id is not None and not isinstance(external_id, str): raise TypeError("External IDs must be strings") - self.internal_id: Optional[int] = internal_id - self.external_id: Optional[str] = external_id + self.internal_id: int | None = internal_id + self.external_id: str | None = external_id def type(self) -> str: """ @@ -117,7 +117,7 @@ def type(self) -> str: """ return "id" if self.internal_id is not None else "externalId" - def content(self) -> Union[int, str]: + def content(self) -> int | str: """ Get the value of the ID @@ -249,7 +249,7 @@ def heartbeat_loop() -> None: ############################## _logger.info(f"Starting to run function: {input_function.__name__}") - heartbeat_thread: Optional[Thread] = None + heartbeat_thread: Thread | None = None try: heartbeat_thread = Thread(target=heartbeat_loop, name="HeartbeatLoop", daemon=True) heartbeat_thread.start() @@ -313,12 +313,12 @@ def throttled_loop(target_time: int, cancellation_token: CancellationToken) -> G def _retry_internal( f: Callable[..., _T2], cancellation_token: CancellationToken, - exceptions: Union[Tuple[Type[Exception], ...], Dict[Type[Exception], Callable[[Exception], bool]]], + exceptions: tuple[Type[Exception], ...] | dict[Type[Exception], Callable[[Exception], bool]], tries: int, delay: float, - max_delay: Optional[float], + max_delay: float | None, backoff: float, - jitter: Union[float, Tuple[float, float]], + jitter: float | tuple[float, float], ) -> _T2: logger = logging.getLogger(__name__) @@ -366,13 +366,13 @@ def _retry_internal( def retry( - cancellation_token: Optional[CancellationToken] = None, - exceptions: Union[Tuple[Type[Exception], ...], Dict[Type[Exception], Callable[[Any], bool]]] = (Exception,), + cancellation_token: CancellationToken | None = None, + exceptions: tuple[Type[Exception], ...] | dict[Type[Exception], Callable[[Any], bool]] = (Exception,), tries: int = 10, delay: float = 1, - max_delay: Optional[float] = 60, + max_delay: float | None = 60, backoff: float = 2, - jitter: Union[float, Tuple[float, float]] = (0, 2), + jitter: float | tuple[float, float] = (0, 2), ) -> Callable[[Callable[..., _T2]], Callable[..., _T2]]: """ Returns a retry decorator. @@ -414,8 +414,8 @@ def retry_decorator(f: Callable[..., _T2], *fargs: Any, **fkwargs: Any) -> _T2: def requests_exceptions( - status_codes: Optional[List[int]] = None, -) -> Dict[Type[Exception], Callable[[Any], bool]]: + status_codes: list[int] | None = None, +) -> dict[Type[Exception], Callable[[Any], bool]]: """ Retry exceptions from using the ``requests`` library. This will retry all connection and HTTP errors matching the given status codes. @@ -448,8 +448,8 @@ def handle_http_errors(exception: RequestException) -> bool: def httpx_exceptions( - status_codes: Optional[List[int]] = None, -) -> Dict[Type[Exception], Callable[[Any], bool]]: + status_codes: list[int] | None = None, +) -> dict[Type[Exception], Callable[[Any], bool]]: """ Retry exceptions from using the ``httpx`` library. This will retry all connection and HTTP errors matching the given status codes. @@ -482,8 +482,8 @@ def handle_http_errors(exception: HTTPError) -> bool: def cognite_exceptions( - status_codes: Optional[List[int]] = None, -) -> Dict[Type[Exception], Callable[[Any], bool]]: + status_codes: list[int] | None = None, +) -> dict[Type[Exception], Callable[[Any], bool]]: """ Retry exceptions from using the Cognite SDK. This will retry all connection and HTTP errors matching the given status codes. @@ -569,9 +569,7 @@ def truncate_byte_len(item: str, ln: int) -> str: class BufferedReadWithLength(io.BufferedReader): - def __init__( - self, raw: RawIOBase, buffer_size: int, len: int, on_close: Optional[Callable[[], None]] = None - ) -> None: + def __init__(self, raw: RawIOBase, buffer_size: int, len: int, on_close: Callable[[], None] | None = None) -> None: super().__init__(raw, buffer_size) # Do not remove even if it appears to be unused. :P # Requests uses this to add the content-length header, which is necessary for writing to files in azure clusters @@ -588,7 +586,7 @@ def iterable_to_stream( iterator: Iterable[bytes], file_size_bytes: int, buffer_size: int = io.DEFAULT_BUFFER_SIZE, - on_close: Optional[Callable[[], None]] = None, + on_close: Callable[[], None] | None = None, ) -> BufferedReadWithLength: class ChunkIteratorStream(io.RawIOBase): def __init__(self) -> None: