From d8f244d377c9884929c1b45630a4595f5a3b2625 Mon Sep 17 00:00:00 2001 From: Mathias Lohne Date: Thu, 16 May 2024 12:46:20 +0200 Subject: [PATCH] Add upload queue for data model nodes and edges (#328) * Add upload queue for data model nodes and edges * Fix type hints for versions before 3.10 --- .../extractorutils/uploader/data_modeling.py | 115 ++++++++++++++++++ pyproject.toml | 1 + tests/conftest.py | 28 +++-- .../test_data_modeling_queue.py | 90 ++++++++++++++ 4 files changed, 226 insertions(+), 8 deletions(-) create mode 100644 cognite/extractorutils/uploader/data_modeling.py create mode 100644 tests/tests_integration/test_data_modeling_queue.py diff --git a/cognite/extractorutils/uploader/data_modeling.py b/cognite/extractorutils/uploader/data_modeling.py new file mode 100644 index 00000000..f000cd18 --- /dev/null +++ b/cognite/extractorutils/uploader/data_modeling.py @@ -0,0 +1,115 @@ +from types import TracebackType +from typing import Any, Callable, List, Optional, Type + +from cognite.client import CogniteClient +from cognite.client.data_classes.data_modeling import EdgeApply, NodeApply +from cognite.extractorutils.threading import CancellationToken +from cognite.extractorutils.uploader._base import ( + RETRIES, + RETRY_BACKOFF_FACTOR, + RETRY_DELAY, + RETRY_MAX_DELAY, + AbstractUploadQueue, +) +from cognite.extractorutils.util import cognite_exceptions, retry + + +class InstanceUploadQueue(AbstractUploadQueue): + def __init__( + self, + cdf_client: CogniteClient, + post_upload_function: Optional[Callable[[List[Any]], None]] = None, + max_queue_size: Optional[int] = None, + max_upload_interval: Optional[int] = None, + trigger_log_level: str = "DEBUG", + thread_name: Optional[str] = None, + cancellation_token: Optional[CancellationToken] = None, + auto_create_start_nodes: bool = True, + auto_create_end_nodes: bool = True, + auto_create_direct_relations: bool = True, + ): + super().__init__( + cdf_client, + post_upload_function, + max_queue_size, + max_upload_interval, + trigger_log_level, + thread_name, + cancellation_token, + ) + + self.auto_create_start_nodes = auto_create_start_nodes + self.auto_create_end_nodes = auto_create_end_nodes + self.auto_create_direct_relations = auto_create_direct_relations + + self.node_queue: List[NodeApply] = [] + self.edge_queue: List[EdgeApply] = [] + + def add_to_upload_queue( + self, + *, + node_data: Optional[List[NodeApply]] = None, + edge_data: Optional[List[EdgeApply]] = None, + ) -> None: + if node_data: + with self.lock: + self.node_queue.extend(node_data) + self.upload_queue_size += len(node_data) + + if edge_data: + with self.lock: + self.edge_queue.extend(edge_data) + self.upload_queue_size += len(edge_data) + + with self.lock: + self._check_triggers() + + def upload(self) -> None: + @retry( + exceptions=cognite_exceptions(), + cancellation_token=self.cancellation_token, + tries=RETRIES, + delay=RETRY_DELAY, + max_delay=RETRY_MAX_DELAY, + backoff=RETRY_BACKOFF_FACTOR, + ) + def upload_batch() -> None: + self.cdf_client.data_modeling.instances.apply( + nodes=self.node_queue, + edges=self.edge_queue, + auto_create_start_nodes=self.auto_create_start_nodes, + auto_create_end_nodes=self.auto_create_end_nodes, + auto_create_direct_relations=self.auto_create_direct_relations, + ) + self.node_queue.clear() + self.edge_queue.clear() + self.upload_queue_size = 0 + + with self.lock: + upload_batch() + + def __enter__(self) -> "InstanceUploadQueue": + """ + Wraps around start method, for use as context manager + + Returns: + self + """ + self.start() + return self + + def __exit__( + self, + exc_type: Optional[Type[BaseException]], + exc_val: Optional[BaseException], + exc_tb: Optional[TracebackType], + ) -> None: + """ + Wraps around stop method, for use as context manager + + Args: + exc_type: Exception type + exc_val: Exception value + exc_tb: Traceback + """ + self.stop() diff --git a/pyproject.toml b/pyproject.toml index 20259895..34486044 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -84,6 +84,7 @@ parameterized = "*" requests = "^2.31.0" types-requests = "^2.31.0.20240125" httpx = "^0.27.0" +faker = "^25.2.0" [build-system] requires = ["poetry-core>=1.0.0"] diff --git a/tests/conftest.py b/tests/conftest.py index 23fb01dd..46467ae1 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -1,7 +1,7 @@ import os from dataclasses import dataclass from enum import Enum -from typing import List, Optional +from typing import List, Optional, Tuple import pytest @@ -10,6 +10,9 @@ from cognite.client.credentials import OAuthClientCredentials from cognite.client.exceptions import CogniteAPIError, CogniteNotFoundError +NUM_NODES = 5000 +NUM_EDGES = NUM_NODES // 100 + class ETestType(Enum): TIME_SERIES = "time_series" @@ -17,6 +20,7 @@ class ETestType(Enum): RAW = "raw" ASSETS = "assets" EVENTS = "events" + DATA_MODELING = "data_modeling" @dataclass @@ -28,7 +32,7 @@ class ParamTest: @pytest.fixture -def set_upload_test(set_test_parameters: ParamTest, set_client: CogniteClient): +def set_upload_test(set_test_parameters: ParamTest, set_client: CogniteClient) -> Tuple[CogniteClient, ParamTest]: client = set_client test_parameter = set_test_parameters clean_test(client, test_parameter) @@ -55,21 +59,29 @@ def set_client() -> CogniteClient: return CogniteClient(client_config) -def clean_test(client: CogniteClient, test_parameter: ParamTest): - if test_parameter.test_type == ETestType.TIME_SERIES: +def clean_test(client: CogniteClient, test_parameter: ParamTest) -> None: + if test_parameter.test_type.value == ETestType.TIME_SERIES.value: client.time_series.delete(external_id=test_parameter.external_ids, ignore_unknown_ids=True) - elif test_parameter.test_type == ETestType.EVENTS: + elif test_parameter.test_type.value == ETestType.EVENTS.value: client.events.delete(external_id=test_parameter.external_ids, ignore_unknown_ids=True) - elif test_parameter.test_type == ETestType.ASSETS: + elif test_parameter.test_type.value == ETestType.ASSETS.value: client.assets.delete(external_id=test_parameter.external_ids, ignore_unknown_ids=True) - elif test_parameter.test_type == ETestType.RAW: + elif test_parameter.test_type.value == ETestType.RAW.value: try: client.raw.tables.delete(test_parameter.database_name, test_parameter.table_name) except CogniteAPIError: pass - elif test_parameter.test_type == ETestType.FILES: + elif test_parameter.test_type.value == ETestType.FILES.value: for file in test_parameter.external_ids: try: client.files.delete(external_id=file) except CogniteNotFoundError: pass + elif test_parameter.test_type.value == ETestType.DATA_MODELING.value: + client.data_modeling.instances.delete( + nodes=[("ExtractorUtilsTests", i) for i in test_parameter.external_ids[0:NUM_NODES]], + edges=[("ExtractorUtilsTests", i) for i in test_parameter.external_ids[NUM_NODES : NUM_NODES + NUM_EDGES]], + ) + client.data_modeling.instances.delete( + nodes=[("ExtractorUtilsTests", test_parameter.external_ids[-1])], + ) diff --git a/tests/tests_integration/test_data_modeling_queue.py b/tests/tests_integration/test_data_modeling_queue.py new file mode 100644 index 00000000..7b0dd36e --- /dev/null +++ b/tests/tests_integration/test_data_modeling_queue.py @@ -0,0 +1,90 @@ +import random +from time import sleep +from typing import Tuple + +import pytest +from faker import Faker + +from cognite.client import CogniteClient +from cognite.client.data_classes.data_modeling import ( + EdgeApply, + NodeApply, + NodeOrEdgeData, + ViewId, +) +from cognite.extractorutils.uploader.data_modeling import InstanceUploadQueue +from tests.conftest import NUM_EDGES, NUM_NODES, ETestType, ParamTest + +fake = Faker() + + +@pytest.fixture +def set_test_parameters() -> ParamTest: + test_id = random.randint(0, 2**31) + return ParamTest( + test_type=ETestType.DATA_MODELING, + external_ids=[f"utils-test-{test_id}-node-{i}" for i in range(NUM_NODES)] + + [f"utils-test-{test_id}-edge-{i}" for i in range(NUM_EDGES)] + + [f"utils-test-{test_id}-movie"], + ) + + +def test_dm_upload_queue(set_upload_test: Tuple[CogniteClient, ParamTest]) -> None: + client, test_params = set_upload_test + queue = InstanceUploadQueue(client) + + for i in range(NUM_NODES): + queue.add_to_upload_queue( + node_data=[ + NodeApply( + space="ExtractorUtilsTests", + external_id=test_params.external_ids[i], + sources=[ + NodeOrEdgeData( + source=ViewId(space="ExtractorUtilsTests", external_id="Actor", version="65ce919fcaad7f"), + properties={"name": fake.name(), "age": fake.random_int(23, 71)}, + ) + ], + ) + ] + ) + + queue.add_to_upload_queue( + node_data=[ + NodeApply( + space="ExtractorUtilsTests", + external_id=test_params.external_ids[-1], + sources=[ + NodeOrEdgeData( + source=ViewId(space="ExtractorUtilsTests", external_id="Movie", version="1bf67849d6a6f5"), + properties={"name": "Test Movie", "imdbRating": 8.2}, + ) + ], + ) + ] + ) + + for i in range(NUM_EDGES): + queue.add_to_upload_queue( + edge_data=[ + EdgeApply( + space="ExtractorUtilsTests", + external_id=test_params.external_ids[NUM_NODES + i], + start_node=("ExtractorUtilsTests", test_params.external_ids[i]), + end_node=("ExtractorUtilsTests", test_params.external_ids[-1]), + type=("ExtractorUtilsTests", "acts-in"), + ) + ] + ) + + queue.upload() + + sleep(10) + + items = client.data_modeling.instances.retrieve( + nodes=[("ExtractorUtilsTests", i) for i in test_params.external_ids[0:NUM_NODES]], + edges=[("ExtractorUtilsTests", i) for i in test_params.external_ids[NUM_NODES : NUM_NODES + NUM_EDGES]], + ) + + assert len(items.nodes) == NUM_NODES + assert len(items.edges) == NUM_EDGES