Skip to content

Commit

Permalink
tweak logic for using cached SDK client in config reloader (#313)
Browse files Browse the repository at this point in the history
* tweak logic for using cached SDK client in config reloader

* refactor _inject_cognite using asdict, add _use_cached_cognite_client

* happy mypy happy life

* change time.sleep to dynamic (util fn await_is_uploaded_status)
  • Loading branch information
haakonvt authored Mar 19, 2024
1 parent cdd52fd commit 31ad9e4
Show file tree
Hide file tree
Showing 12 changed files with 75 additions and 66 deletions.
7 changes: 7 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,13 @@ Changes are grouped as follows
- `Fixed` for any bug fixes.
- `Security` in case of vulnerabilities.

## 7.1.1

### Fixed

* Enhancement of `7.0.5`: more use cases covered (to avoid repeatedly fetching a new token).
* When using remote config, the full local `idp-authentication` is now injected (some fields were missing).

## 7.1.0

### Added
Expand Down
2 changes: 1 addition & 1 deletion cognite/extractorutils/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,5 +16,5 @@
Cognite extractor utils is a Python package that simplifies the development of new extractors.
"""

__version__ = "7.1.0"
__version__ = "7.1.1"
from .base import Extractor
6 changes: 3 additions & 3 deletions cognite/extractorutils/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -152,17 +152,17 @@ def reload_config_callback(self) -> None:
def _reload_config(self) -> None:
self.logger.info("Config file has changed")

if self.reload_config_action == ReloadConfigAction.REPLACE_ATTRIBUTE:
if self.reload_config_action is ReloadConfigAction.REPLACE_ATTRIBUTE:
self.logger.info("Loading in new config file")
self.config_resolver.accept_new_config()
self.config = self.config_resolver.config
Extractor._config_singleton = self.config # type: ignore

elif self.reload_config_action == ReloadConfigAction.SHUTDOWN:
elif self.reload_config_action is ReloadConfigAction.SHUTDOWN:
self.logger.info("Shutting down, expecting to be restarted")
self.cancellation_token.cancel()

elif self.reload_config_action == ReloadConfigAction.CALLBACK:
elif self.reload_config_action is ReloadConfigAction.CALLBACK:
self.logger.info("Loading in new config file")
self.config_resolver.accept_new_config()
self.config = self.config_resolver.config
Expand Down
6 changes: 3 additions & 3 deletions cognite/extractorutils/configtools/elements.py
Original file line number Diff line number Diff line change
Expand Up @@ -372,7 +372,7 @@ def get_data_set(self, cdf_client: CogniteClient) -> Optional[DataSet]:

return cdf_client.data_sets.retrieve(
id=self.data_set.either_id.internal_id,
external_id=self.data_set.either_id.external_id, # type: ignore
external_id=self.data_set.either_id.external_id,
)

def get_extraction_pipeline(self, cdf_client: CogniteClient) -> Optional[ExtractionPipeline]:
Expand All @@ -381,8 +381,8 @@ def get_extraction_pipeline(self, cdf_client: CogniteClient) -> Optional[Extract

either_id = self.extraction_pipeline.either_id
extraction_pipeline = cdf_client.extraction_pipelines.retrieve(
id=either_id.internal_id, # type: ignore
external_id=either_id.external_id, # type: ignore
id=either_id.internal_id,
external_id=either_id.external_id,
)
if extraction_pipeline is None:
raise ValueError(f"Extraction pipeline with {either_id.type()} {either_id.content()} not found")
Expand Down
51 changes: 25 additions & 26 deletions cognite/extractorutils/configtools/loaders.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
# limitations under the License.

import argparse
import dataclasses
import json
import logging
import os
Expand All @@ -21,7 +22,7 @@
from enum import Enum
from hashlib import sha256
from pathlib import Path
from typing import Any, Callable, Dict, Generic, Iterable, Optional, TextIO, Type, TypeVar, Union
from typing import Any, Callable, Dict, Generic, Iterable, Optional, TextIO, Type, TypeVar, Union, cast

import dacite
import yaml
Expand Down Expand Up @@ -349,45 +350,43 @@ def from_cli(
return cls(args.config[0], config_type)

def _inject_cognite(self, local_part: _BaseConfig, remote_part: Dict[str, Any]) -> Dict[str, Any]:
if "cognite" not in remote_part:
remote_part["cognite"] = {}

remote_part["cognite"]["idp-authentication"] = {
"client_id": local_part.cognite.idp_authentication.client_id,
"scopes": local_part.cognite.idp_authentication.scopes,
"secret": local_part.cognite.idp_authentication.secret,
"tenant": local_part.cognite.idp_authentication.tenant,
"token_url": local_part.cognite.idp_authentication.token_url,
"resource": local_part.cognite.idp_authentication.resource,
"authority": local_part.cognite.idp_authentication.authority,
}
# We can not dump 'local_part.cognite' directly because e.g. 'data_set' may be set remote only...
remote_part.setdefault("cognite", {})
remote_part["cognite"]["idp_authentication"] = dataclasses.asdict(local_part.cognite.idp_authentication)
remote_part["cognite"]["extraction-pipeline"] = dataclasses.asdict(
local_part.cognite.extraction_pipeline # type: ignore [arg-type]
)

if local_part.cognite.host is not None:
remote_part["cognite"]["host"] = local_part.cognite.host
remote_part["cognite"]["project"] = local_part.cognite.project

# Ignoring None type, extraction pipelines is required at this point
remote_part["cognite"]["extraction-pipeline"] = {}
remote_part["cognite"]["extraction-pipeline"]["id"] = local_part.cognite.extraction_pipeline.id # type: ignore
remote_part["cognite"]["extraction-pipeline"][
"external_id"
] = local_part.cognite.extraction_pipeline.external_id # type: ignore

return remote_part

def _use_cached_cognite_client(self, tmp_config: _BaseConfig) -> bool:
# Ideally we'd check tmp_config == self._config, but due to 'is_remote & _inject_...', this is not
# reliable to avoid new unneeded instantiations of CogniteClient:
return (
self.cognite_client is not None
and self._config is not None
and tmp_config.cognite.host == self._config.cognite.host
and tmp_config.cognite.project == self._config.cognite.project
and tmp_config.cognite.idp_authentication == self._config.cognite.idp_authentication
)

def _resolve_config(self) -> None:
self._reload_file()

if self.is_remote:
_logger.debug("Loading remote config file")
tmp_config: _BaseConfig = load_yaml(self._config_text, _BaseConfig) # type: ignore
if self.cognite_client is None or self._config is None or tmp_config.cognite != self._config.cognite:
# Credentials towards CDF may have changed, instantiate (and store) a new client:
client = tmp_config.cognite.get_cognite_client("config_resolver")
self.cognite_client = client
else:
if self._use_cached_cognite_client(tmp_config):
# Use existing client to avoid invoking a token refresh, if possible. Reason: this is run every 5 min
# by default ('ConfigReloader' thread) which for certain OAuth providers like Auth0, incurs a cost:
client = self.cognite_client
client = cast(CogniteClient, self.cognite_client)
else:
# Credentials towards CDF may have changed, instantiate (and store) a new client:
client = self.cognite_client = tmp_config.cognite.get_cognite_client("config_resolver")

response = client.extraction_pipelines.config.retrieve(
tmp_config.cognite.get_extraction_pipeline(client).external_id # type: ignore # ignoring extpipe None
Expand Down
6 changes: 3 additions & 3 deletions cognite/extractorutils/metrics.py
Original file line number Diff line number Diff line change
Expand Up @@ -384,7 +384,7 @@ def _init_cdf(self) -> None:
data_set_id = dataset.id

for metric in REGISTRY.collect():
if type(metric) == Metric and metric.type in ["gauge", "counter"]:
if type(metric) is Metric and metric.type in ["gauge", "counter"]:
external_id = self.external_id_prefix + metric.name

time_series.append(
Expand All @@ -393,8 +393,8 @@ def _init_cdf(self) -> None:
name=metric.name,
legacy_name=external_id,
description=metric.documentation,
asset_id=asset_id, # type: ignore # this is optional. Type hint in SDK is wrong
data_set_id=data_set_id, # type: ignore # this is optional. Type hint in SDK is wrong
asset_id=asset_id,
data_set_id=data_set_id,
)
)

Expand Down
3 changes: 1 addition & 2 deletions cognite/extractorutils/statestore.py
Original file line number Diff line number Diff line change
Expand Up @@ -389,8 +389,7 @@ def impl() -> None:
if self._initialized and not force:
return

# ignore type since list _is_ optional, sdk types are wrong
rows = self._cdf_client.raw.rows.list(db_name=self.database, table_name=self.table, limit=None) # type: ignore
rows = self._cdf_client.raw.rows.list(db_name=self.database, table_name=self.table, limit=None)

with self.lock:
self._local_state.clear()
Expand Down
25 changes: 12 additions & 13 deletions cognite/extractorutils/uploader/time_series.py
Original file line number Diff line number Diff line change
Expand Up @@ -522,8 +522,8 @@ def _upload_single(either_id: EitherId, upload_this: SequenceData) -> SequenceDa

try:
self.cdf_client.sequences.data.insert(
id=either_id.internal_id, # type: ignore
external_id=either_id.external_id, # type: ignore
id=either_id.internal_id,
external_id=either_id.external_id,
rows=upload_this,
column_external_ids=None,
)
Expand All @@ -534,8 +534,8 @@ def _upload_single(either_id: EitherId, upload_this: SequenceData) -> SequenceDa

# Retry
self.cdf_client.sequences.data.insert(
id=either_id.internal_id, # type: ignore
external_id=either_id.external_id, # type: ignore
id=either_id.internal_id,
external_id=either_id.external_id,
rows=upload_this,
column_external_ids=None,
)
Expand All @@ -553,7 +553,6 @@ def _upload_single(either_id: EitherId, upload_this: SequenceData) -> SequenceDa
self._resolve_dataset_ids()

for either_id, upload_this in self.upload_queue.items():
_labels = str(either_id.content())
_upload_single(either_id, upload_this)
self.points_written.inc()

Expand Down Expand Up @@ -582,11 +581,11 @@ def _create_or_update(self, either_id: EitherId) -> None:
try:
seq = self.cdf_client.sequences.create(
Sequence(
id=either_id.internal_id, # type: ignore # these are optional, the SDK types are wrong
external_id=either_id.external_id, # type: ignore
name=self.sequence_names.get(either_id, None), # type: ignore
description=self.sequence_descriptions.get(either_id, None), # type: ignore
metadata=self.sequence_metadata.get(either_id, None), # type: ignore
id=either_id.internal_id,
external_id=either_id.external_id,
name=self.sequence_names.get(either_id, None),
description=self.sequence_descriptions.get(either_id, None),
metadata=self.sequence_metadata.get(either_id, None),
asset_id=self.asset_ids.get(self.sequence_asset_external_ids.get(either_id, None), None), # type: ignore
data_set_id=self.dataset_ids.get(self.sequence_dataset_external_ids.get(either_id, None), None), # type: ignore
columns=column_def, # type: ignore # We already checked for None, mypy is wrong
Expand All @@ -595,9 +594,9 @@ def _create_or_update(self, either_id: EitherId) -> None:

except CogniteDuplicatedError:
self.logger.info("Sequnce already exist: {}".format(either_id))
seq = self.cdf_client.sequences.retrieve(
id=either_id.internal_id, # type: ignore # these are optional, the SDK types are wrong
external_id=either_id.external_id, # type: ignore
seq = self.cdf_client.sequences.retrieve( # type: ignore [assignment]
id=either_id.internal_id,
external_id=either_id.external_id,
)

# Update definition of cached sequence
Expand Down
8 changes: 4 additions & 4 deletions cognite/extractorutils/uploader_types.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
import sys
from typing import Iterable, List, Optional, Union

from cognite.client.data_classes import Event as _Event
from cognite.client.data_classes import Row as _Row

try:
from typing import TypeAlias # type: ignore
except ImportError:
# Backport for python < 3.10
if sys.version_info >= (3, 10):
from typing import TypeAlias
else:
from typing_extensions import TypeAlias


Expand Down
4 changes: 2 additions & 2 deletions cognite/extractorutils/util.py
Original file line number Diff line number Diff line change
Expand Up @@ -424,7 +424,7 @@ def my_function() -> None:
"""
status_codes = status_codes or [408, 425, 429, 500, 502, 503, 504]
# types ignored, since they are not installed as we don't depend on the package
from requests.exceptions import HTTPError, RequestException # type: ignore
from requests.exceptions import HTTPError, RequestException

def handle_http_errors(exception: RequestException) -> bool:
if isinstance(exception, HTTPError):
Expand Down Expand Up @@ -458,7 +458,7 @@ def my_function() -> None:
"""
status_codes = status_codes or [408, 425, 429, 500, 502, 503, 504]
# types ignored, since they are not installed as we don't depend on the package
from httpx import HTTPError, HTTPStatusError # type: ignore
from httpx import HTTPError, HTTPStatusError

def handle_http_errors(exception: HTTPError) -> bool:
if isinstance(exception, HTTPStatusError):
Expand Down
4 changes: 3 additions & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[tool.poetry]
name = "cognite-extractor-utils"
version = "7.1.0"
version = "7.1.1"
description = "Utilities for easier development of extractors for CDF"
authors = ["Mathias Lohne <[email protected]>"]
license = "Apache-2.0"
Expand Down Expand Up @@ -47,6 +47,8 @@ follow_imports = "normal"
namespace_packages = true
explicit_package_bases = true
show_error_codes = true
warn_redundant_casts = true
warn_unused_ignores = true
exclude = "tests/*"

[tool.poetry.dependencies]
Expand Down
19 changes: 11 additions & 8 deletions tests/tests_integration/test_cdf_upload_integration.py
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,12 @@ def tearDown(self):
except CogniteNotFoundError:
pass

def await_is_uploaded_status(self, external_id):
for _ in range(10):
if self.client.files.retrieve(external_id=external_id).uploaded:
return
time.sleep(1)

def test_raw_upload_queue(self):
queue = RawUploadQueue(cdf_client=self.client, max_queue_size=500)

Expand All @@ -145,7 +151,6 @@ def test_raw_upload_queue(self):
[{k: r.__dict__[k] for k in ["key", "columns"]} for r in uploaded],
[{k: r.__dict__[k] for k in ["key", "columns"]} for r in rows_in_cdf],
)
pass

def test_time_series_upload_queue1(self):
created = self.client.time_series.create(
Expand Down Expand Up @@ -310,6 +315,8 @@ def test_file_upload_queue(self):

queue.upload()

self.await_is_uploaded_status(self.file1)
self.await_is_uploaded_status(self.file2)
file1 = self.client.files.download_bytes(external_id=self.file1)
file2 = self.client.files.download_bytes(external_id=self.file2)
file3 = self.client.files.retrieve(external_id=self.empty_file)
Expand All @@ -331,7 +338,8 @@ def test_bytes_upload_queue(self):
)

queue.upload()

self.await_is_uploaded_status(self.file1)
self.await_is_uploaded_status(self.file2)
file1 = self.client.files.download_bytes(external_id=self.file1)
file2 = self.client.files.download_bytes(external_id=self.file2)

Expand All @@ -349,12 +357,7 @@ def test_big_file_upload_queue(self):

queue.upload()

for _ in range(10):
file = self.client.files.retrieve(external_id=self.bigfile)
if file.uploaded:
break
time.sleep(1)

self.await_is_uploaded_status(self.bigfile)
bigfile = self.client.files.download_bytes(external_id=self.bigfile)

assert len(bigfile) == 10_000_000

0 comments on commit 31ad9e4

Please sign in to comment.