Skip to content

Commit

Permalink
Improve retries towards CDF (#295)
Browse files Browse the repository at this point in the history
Add a `cognite_exceptions` template for the `@retry` decorator that only
retries retryable failures. Use that in upload queues etc.

Also move the implementation into a method so we can access and respect
the cancellation token.
  • Loading branch information
mathialo authored Feb 20, 2024
1 parent a1e673e commit 00d4171
Show file tree
Hide file tree
Showing 9 changed files with 364 additions and 311 deletions.
154 changes: 80 additions & 74 deletions cognite/extractorutils/statestore.py
Original file line number Diff line number Diff line change
Expand Up @@ -92,19 +92,17 @@
from types import TracebackType
from typing import Any, Callable, Dict, Iterator, List, Optional, Tuple, Type, Union

from requests.exceptions import ConnectionError

from cognite.client import CogniteClient
from cognite.client.exceptions import CogniteAPIError, CogniteException
from cognite.client.exceptions import CogniteAPIError
from cognite.extractorutils.threading import CancellationToken
from cognite.extractorutils.uploader import DataPointList

from ._inner_util import _DecimalDecoder, _DecimalEncoder, _resolve_log_level
from .util import retry
from .util import cognite_exceptions, retry

RETRY_BACKOFF_FACTOR = 1.5
RETRY_MAX_DELAY = 15
RETRY_DELAY = 5
RETRY_MAX_DELAY = 60
RETRY_DELAY = 1
RETRIES = 10


Expand Down Expand Up @@ -347,79 +345,87 @@ def __init__(

self._ensure_table()

@retry(
exceptions=(CogniteException, ConnectionError),
tries=RETRIES,
delay=RETRY_DELAY,
max_delay=RETRY_MAX_DELAY,
backoff=RETRY_BACKOFF_FACTOR,
)
def _ensure_table(self) -> None:
try:
self._cdf_client.raw.databases.create(self.database)
except CogniteAPIError as e:
if not e.code == 400:
raise e
try:
self._cdf_client.raw.tables.create(self.database, self.table)
except CogniteAPIError as e:
if not e.code == 400:
raise e

def initialize(self, force: bool = False) -> None:
self._initialize_implementation(force)

@retry(
exceptions=(CogniteException, ConnectionError),
tries=RETRIES,
delay=RETRY_DELAY,
max_delay=RETRY_MAX_DELAY,
backoff=RETRY_BACKOFF_FACTOR,
)
def _initialize_implementation(self, force: bool = False) -> None:
"""
Get all known states.
Args:
force: Enable re-initialization, ie overwrite when called multiple times
"""
if self._initialized and not force:
return

# ignore type since list _is_ optional, sdk types are wrong
rows = self._cdf_client.raw.rows.list(db_name=self.database, table_name=self.table, limit=None) # type: ignore
@retry(
exceptions=cognite_exceptions(),
cancellation_token=self.cancellation_token,
tries=RETRIES,
delay=RETRY_DELAY,
max_delay=RETRY_MAX_DELAY,
backoff=RETRY_BACKOFF_FACTOR,
)
def impl() -> None:
try:
self._cdf_client.raw.databases.create(self.database)
except CogniteAPIError as e:
if not e.code == 400:
raise e
try:
self._cdf_client.raw.tables.create(self.database, self.table)
except CogniteAPIError as e:
if not e.code == 400:
raise e

with self.lock:
self._local_state.clear()
for row in rows:
if row.key is None or row.columns is None:
self.logger.warning(f"None encountered in row: {str(row)}")
# should never happen, but type from sdk is optional
continue
self._local_state[row.key] = row.columns
impl()

self._initialized = True
def initialize(self, force: bool = False) -> None:
@retry(
exceptions=cognite_exceptions(),
cancellation_token=self.cancellation_token,
tries=RETRIES,
delay=RETRY_DELAY,
max_delay=RETRY_MAX_DELAY,
backoff=RETRY_BACKOFF_FACTOR,
)
def impl() -> None:
"""
Get all known states.
Args:
force: Enable re-initialization, ie overwrite when called multiple times
"""
if self._initialized and not force:
return

# ignore type since list _is_ optional, sdk types are wrong
rows = self._cdf_client.raw.rows.list(db_name=self.database, table_name=self.table, limit=None) # type: ignore

with self.lock:
self._local_state.clear()
for row in rows:
if row.key is None or row.columns is None:
self.logger.warning(f"None encountered in row: {str(row)}")
# should never happen, but type from sdk is optional
continue
self._local_state[row.key] = row.columns

self._initialized = True

impl()

def synchronize(self) -> None:
self._synchronize_implementation()

@retry(
exceptions=(CogniteException, ConnectionError),
tries=RETRIES,
delay=RETRY_DELAY,
max_delay=RETRY_MAX_DELAY,
backoff=RETRY_BACKOFF_FACTOR,
)
def _synchronize_implementation(self) -> None:
"""
Upload local state store to CDF
"""
self._cdf_client.raw.rows.insert(db_name=self.database, table_name=self.table, row=self._local_state)
# Create a copy of deleted to facilitate testing (mock library stores list, and as it changes, the assertions
# fail)
self._cdf_client.raw.rows.delete(db_name=self.database, table_name=self.table, key=[k for k in self._deleted])
with self.lock:
self._deleted.clear()
@retry(
exceptions=cognite_exceptions(),
cancellation_token=self.cancellation_token,
tries=RETRIES,
delay=RETRY_DELAY,
max_delay=RETRY_MAX_DELAY,
backoff=RETRY_BACKOFF_FACTOR,
)
def impl() -> None:
"""
Upload local state store to CDF
"""
self._cdf_client.raw.rows.insert(db_name=self.database, table_name=self.table, row=self._local_state)
# Create a copy of deleted to facilitate testing (mock library stores list, and as it changes, the
# assertions fail)
self._cdf_client.raw.rows.delete(
db_name=self.database, table_name=self.table, key=[k for k in self._deleted]
)
with self.lock:
self._deleted.clear()

impl()

def __enter__(self) -> "RawStateStore":
"""
Expand Down
6 changes: 3 additions & 3 deletions cognite/extractorutils/uploader/_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,7 @@ class TimestampedObject:
created: Arrow


RETRY_BACKOFF_FACTOR = 1.5
RETRY_MAX_DELAY = 15
RETRY_DELAY = 5
RETRY_BACKOFF_FACTOR = 2
RETRY_MAX_DELAY = 60
RETRY_DELAY = 1
RETRIES = 10
58 changes: 30 additions & 28 deletions cognite/extractorutils/uploader/assets.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@

from cognite.client import CogniteClient
from cognite.client.data_classes.assets import Asset
from cognite.client.exceptions import CogniteAPIError, CogniteDuplicatedError
from cognite.client.exceptions import CogniteDuplicatedError
from cognite.extractorutils.threading import CancellationToken
from cognite.extractorutils.uploader._base import (
RETRIES,
Expand All @@ -30,7 +30,7 @@
ASSETS_UPLOADER_QUEUED,
ASSETS_UPLOADER_WRITTEN,
)
from cognite.extractorutils.util import retry
from cognite.extractorutils.util import cognite_exceptions, retry


class AssetUploadQueue(AbstractUploadQueue):
Expand Down Expand Up @@ -92,9 +92,36 @@ def upload(self) -> None:
"""
Trigger an upload of the queue, clears queue afterwards
"""

@retry(
exceptions=cognite_exceptions(),
cancellation_token=self.cancellation_token,
tries=RETRIES,
delay=RETRY_DELAY,
max_delay=RETRY_MAX_DELAY,
backoff=RETRY_BACKOFF_FACTOR,
)
def _upload_batch() -> None:
try:
self.cdf_client.assets.create(self.upload_queue)
except CogniteDuplicatedError as e:
duplicated_ids = set([dup["externalId"] for dup in e.duplicated if "externalId" in dup])
failed: List[Asset] = [e for e in e.failed]
to_create = []
to_update = []
for asset in failed:
if asset.external_id is not None and asset.external_id in duplicated_ids:
to_update.append(asset)
else:
to_create.append(asset)
if to_create:
self.cdf_client.assets.create(to_create)
if to_update:
self.cdf_client.assets.update(to_update)

if len(self.upload_queue) > 0:
with self.lock:
self._upload_batch()
_upload_batch()

try:
self._post_upload(self.upload_queue)
Expand All @@ -107,31 +134,6 @@ def upload(self) -> None:
self.upload_queue.clear()
self.queue_size.set(self.upload_queue_size)

@retry(
exceptions=(CogniteAPIError, ConnectionError),
tries=RETRIES,
delay=RETRY_DELAY,
max_delay=RETRY_MAX_DELAY,
backoff=RETRY_BACKOFF_FACTOR,
)
def _upload_batch(self) -> None:
try:
self.cdf_client.assets.create(self.upload_queue)
except CogniteDuplicatedError as e:
duplicated_ids = set([dup["externalId"] for dup in e.duplicated if "externalId" in dup])
failed: List[Asset] = [e for e in e.failed]
to_create = []
to_update = []
for asset in failed:
if asset.external_id is not None and asset.external_id in duplicated_ids:
to_update.append(asset)
else:
to_create.append(asset)
if to_create:
self.cdf_client.assets.create(to_create)
if to_update:
self.cdf_client.assets.update(to_update)

def __enter__(self) -> "AssetUploadQueue":
"""
Wraps around start method, for use as context manager
Expand Down
58 changes: 30 additions & 28 deletions cognite/extractorutils/uploader/events.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

from cognite.client import CogniteClient
from cognite.client.data_classes import Event
from cognite.client.exceptions import CogniteAPIError, CogniteDuplicatedError
from cognite.client.exceptions import CogniteDuplicatedError
from cognite.extractorutils.threading import CancellationToken
from cognite.extractorutils.uploader._base import (
RETRIES,
Expand All @@ -31,7 +31,7 @@
EVENTS_UPLOADER_QUEUED,
EVENTS_UPLOADER_WRITTEN,
)
from cognite.extractorutils.util import retry
from cognite.extractorutils.util import cognite_exceptions, retry


class EventUploadQueue(AbstractUploadQueue):
Expand Down Expand Up @@ -96,11 +96,38 @@ def upload(self) -> None:
"""
Trigger an upload of the queue, clears queue afterwards
"""

@retry(
exceptions=cognite_exceptions(),
cancellation_token=self.cancellation_token,
tries=RETRIES,
delay=RETRY_DELAY,
max_delay=RETRY_MAX_DELAY,
backoff=RETRY_BACKOFF_FACTOR,
)
def _upload_batch() -> None:
try:
self.cdf_client.events.create([e for e in self.upload_queue])
except CogniteDuplicatedError as e:
duplicated_ids = set([dup["externalId"] for dup in e.duplicated if "externalId" in dup])
failed: List[Event] = [e for e in e.failed]
to_create = []
to_update = []
for evt in failed:
if evt.external_id is not None and evt.external_id in duplicated_ids:
to_update.append(evt)
else:
to_create.append(evt)
if to_create:
self.cdf_client.events.create(to_create)
if to_update:
self.cdf_client.events.update(to_update)

if len(self.upload_queue) == 0:
return

with self.lock:
self._upload_batch()
_upload_batch()

self.events_written.inc(self.upload_queue_size)

Expand All @@ -113,31 +140,6 @@ def upload(self) -> None:
self.upload_queue_size = 0
self.queue_size.set(self.upload_queue_size)

@retry(
exceptions=(CogniteAPIError, ConnectionError),
tries=RETRIES,
delay=RETRY_DELAY,
max_delay=RETRY_MAX_DELAY,
backoff=RETRY_BACKOFF_FACTOR,
)
def _upload_batch(self) -> None:
try:
self.cdf_client.events.create([e for e in self.upload_queue])
except CogniteDuplicatedError as e:
duplicated_ids = set([dup["externalId"] for dup in e.duplicated if "externalId" in dup])
failed: List[Event] = [e for e in e.failed]
to_create = []
to_update = []
for evt in failed:
if evt.external_id is not None and evt.external_id in duplicated_ids:
to_update.append(evt)
else:
to_create.append(evt)
if to_create:
self.cdf_client.events.create(to_create)
if to_update:
self.cdf_client.events.update(to_update)

def __enter__(self) -> "EventUploadQueue":
"""
Wraps around start method, for use as context manager
Expand Down
Loading

0 comments on commit 00d4171

Please sign in to comment.