Skip to content

Commit

Permalink
Enhance dictionary API with pagination
Browse files Browse the repository at this point in the history
  • Loading branch information
drscholly committed Dec 2, 2024
1 parent 79cb0b8 commit ec6938e
Show file tree
Hide file tree
Showing 6 changed files with 151 additions and 73 deletions.
16 changes: 8 additions & 8 deletions tests/test_copy_dictionary.py
Original file line number Diff line number Diff line change
@@ -1,31 +1,31 @@
from toolbox.api.datagalaxy_api_dictionary import DataGalaxyApiDictionary
from toolbox.api.datagalaxy_api_modules import DataGalaxyApiModules
from toolbox.commands.copy_dictionary import copy_dictionary
from toolbox.api.datagalaxy_api_workspaces import DataGalaxyApiWorkspace
import pytest as pytest


# Mocks

def mock_list_sources_on_source_workspace(self, workspace_name):
def mock_list_objects_on_source_workspace(self, workspace_name):
if workspace_name == 'workspace_source':
return ['source1', 'source2', 'source3']
return [['object1', 'object2', 'object3']]
return []


# Scenarios

def test_copy_sources_when_workspace_source_does_not_exist(mocker):
def test_copy_dictionary_when_workspace_target_does_not_exist(mocker):
# GIVEN
workspaces = mocker.patch.object(DataGalaxyApiWorkspace, 'list_workspaces', autospec=True)
workspaces.return_value = ['workspace_source']
workspace_source_mock = mocker.patch.object(DataGalaxyApiWorkspace, 'get_workspace', autospec=True)
workspace_source_mock.return_value = None
sources_on_source_workspace_mock = mocker.patch.object(
DataGalaxyApiDictionary,
'list_sources',
objects_on_source_workspace_mock = mocker.patch.object(
DataGalaxyApiModules,
'list_objects',
autospec=True
)
sources_on_source_workspace_mock.side_effect = mock_list_sources_on_source_workspace
objects_on_source_workspace_mock.side_effect = mock_list_objects_on_source_workspace

# ASSERT / VERIFY
with pytest.raises(Exception, match='workspace workspace_source does not exist'):
Expand Down
33 changes: 15 additions & 18 deletions tests/test_delete_dictionary.py
Original file line number Diff line number Diff line change
@@ -1,20 +1,17 @@
from toolbox.api.datagalaxy_api_workspaces import DataGalaxyApiWorkspace
from toolbox.api.datagalaxy_api_dictionary import DataGalaxyApiDictionary
from toolbox.api.datagalaxy_api_modules import DataGalaxyApiModules
from toolbox.commands.delete_dictionary import delete_dictionary


# Mock
def mock_list_sources(self, data_type):
if self.url == 'url':
return [
{
'id': '1',
'name': 'Object',
'description': 'An object in the dictionary'
}
]

return []
mock_list_objects = [[
{
'id': '1',
'name': 'Object',
'path': "\\\\Object",
'description': 'Just a simple object'
}
]]


# Scenarios
Expand All @@ -23,14 +20,14 @@ def test_delete_dictionary(mocker):
workspaces = mocker.patch.object(DataGalaxyApiWorkspace, 'list_workspaces', autospec=True)
workspaces.return_value = ['workspace']
workspace_mock = mocker.patch.object(DataGalaxyApiWorkspace, 'get_workspace', autospec=True)
workspace_mock.return_value = {'isVersioningEnabled': False}
dictionary_list_mock = mocker.patch.object(DataGalaxyApiDictionary, 'list_sources', autospec=True)
dictionary_list_mock.side_effect = mock_list_sources
delete_dictionary_mock = mocker.patch.object(DataGalaxyApiDictionary, 'delete_sources', autospec=True)
delete_dictionary_mock.return_value = True
workspace_mock.return_value = {'name': 'workspace', 'isVersioningEnabled': False}
objects_list_mock = mocker.patch.object(DataGalaxyApiModules, 'list_objects', autospec=True)
objects_list_mock.return_value = mock_list_objects
delete_objects_mock = mocker.patch.object(DataGalaxyApiModules, 'delete_objects', autospec=True)
delete_objects_mock.return_value = True

# THEN
result = delete_dictionary(url='url', token='token', workspace_name="workspace")

# ASSERT / VERIFY
assert result is True
assert result == 0
20 changes: 20 additions & 0 deletions toolbox/api/datagalaxy_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,26 @@ def find_root_objects(objects: list) -> list:
return root_objects


def create_batches(input_arrays, max_size=5000):
batches = [] # This will hold the list of arrays
current_batch = [] # Temporary array to build chunks

for arr in input_arrays:
for obj in arr: # Add each object from the input array
if len(current_batch) < max_size:
current_batch.append(obj)
else:
# When the current array reaches max size, save it and start a new one
batches.append(current_batch)
current_batch = [obj]

# Add the remaining objects in `current_batch` if it's not empty
if current_batch:
batches.append(current_batch)

return batches


def to_bulk_tree(properties: list) -> list:
if properties is None or len(properties) == 0:
logging.warn("Cannot bulk upsert an empty list of objects")
Expand Down
64 changes: 62 additions & 2 deletions toolbox/api/datagalaxy_api_modules.py
Original file line number Diff line number Diff line change
@@ -1,16 +1,18 @@
import logging
import requests as requests
from toolbox.api.datagalaxy_api import build_bulktree, prune_tree, remove_technology_code
from toolbox.api.datagalaxy_api import build_bulktree, prune_tree, remove_technology_code, create_batches
from typing import Optional


class DataGalaxyApiModules:
def __init__(self, url: str, token: str, workspace: dict, module: str):
if module not in ["Glossary", "DataProcessing", "Uses"]:
if module not in ["Glossary", "Dictionary", "DataProcessing", "Uses"]:
raise Exception('The specified module does not exist.')
self.module = module
if module == "Glossary":
self.route = "properties"
if module == "Dictionary":
self.route = "sources"
if module == "DataProcessing":
self.route = "dataProcessing"
if module == "Uses":
Expand Down Expand Up @@ -75,6 +77,39 @@ def list_object_items(self, workspace_name: str, parent_id: str) -> list:
result = result + body_json['results']
return result

# This is a specific request for Dictionary
def list_children_objects(self, workspace_name: str, parent_id: str, object_type: str, include_links=False) -> list:
if object_type not in ["containers", "structures", "fields"]:
raise Exception('The specified object type does not exist.')

version_id = self.workspace['defaultVersionId']
if include_links is True:
params = {'versionId': version_id, 'limit': '5000', 'includeLinks': 'true', 'parentId': parent_id}
else:
params = {'versionId': version_id, 'limit': '5000', 'includeAttributes': 'true', 'parentId': parent_id}
headers = {'Authorization': f"Bearer {self.token}"}
response = requests.get(f"{self.url}/{object_type}", params=params, headers=headers)
code = response.status_code
body_json = response.json()
if code != 200:
raise Exception(body_json['error'])
logging.info(
f'list_children_objects - {len(body_json["results"])} objects found on '
f'workspace: {workspace_name} of type: {object_type} in module {self.module}')
result_pages = [body_json['results']]
next_page = body_json["next_page"]
while next_page is not None:
logging.info('Fetching another page from the API...')
headers = {'Authorization': f"Bearer {self.token}"}
response = requests.get(next_page, headers=headers)
body_json = response.json()
logging.info(
f'list_children_objects - {len(body_json["results"])} objects found on '
f'workspace: {workspace_name} of type: {object_type} in module {self.module}')
next_page = body_json["next_page"]
result_pages.append(body_json['results'])
return result_pages

def bulk_upsert_tree(self, workspace_name: str, objects: list, tag_value: Optional[str]) -> int:
# Objects can be in pages, so one POST request per page
for page in objects:
Expand Down Expand Up @@ -103,6 +138,31 @@ def bulk_upsert_tree(self, workspace_name: str, objects: list, tag_value: Option

return 200

def bulk_upsert_source_tree(self, workspace_name: str, source: dict, objects: list, tag_value: Optional[str]) -> int:
batches = create_batches(objects)

# One bulktree call per batch
for batch in batches:
bulktree = build_bulktree([source] + batch)
if len(bulktree) > 1:
raise Exception(f"Problem while creating the bulktree for source {source['name']}")
bulktree = bulktree[0]

if tag_value is not None:
bulktree = prune_tree(bulktree, tag_value)

version_id = self.workspace['defaultVersionId']
headers = {'Authorization': f"Bearer {self.token}"}
response = requests.post(f"{self.url}/{self.route}/bulktree/{version_id}", json=bulktree, headers=headers)
code = response.status_code
body_json = response.json()
if 200 <= code < 300:
logging.info(f'bulk_upsert_tree - {body_json}')
if 400 <= code < 500:
raise Exception(body_json['error'])

return 200

def delete_objects(self, workspace_name: str, ids: list) -> int:
if len(ids) < 1:
logging.warn(f'Nothing to delete on workspace "{workspace_name}" in module {self.module}, aborting.')
Expand Down
53 changes: 27 additions & 26 deletions toolbox/commands/copy_dictionary.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
from typing import Optional

from toolbox.api.datagalaxy_api import DataGalaxyBulkResult
from toolbox.api.datagalaxy_api_dictionary import DataGalaxyApiDictionary
from toolbox.api.datagalaxy_api_modules import DataGalaxyApiModules
from toolbox.api.datagalaxy_api_workspaces import DataGalaxyApiWorkspace


Expand All @@ -11,7 +10,7 @@ def copy_dictionary(url_source: str,
token_target: Optional[str],
workspace_source_name: str,
workspace_target_name: str,
tag_value: Optional[str]) -> DataGalaxyBulkResult:
tag_value: Optional[str]) -> int:
if token_target is None:
token_target = token_source

Expand All @@ -34,39 +33,41 @@ def copy_dictionary(url_source: str,
if target_workspace is None:
raise Exception(f'workspace {workspace_target_name} does not exist')

dictionary_on_source_workspace = DataGalaxyApiDictionary(
source_dictionary_api = DataGalaxyApiModules(
url=url_source,
token=token_source,
workspace=workspaces_api_on_source_env.get_workspace(workspace_source_name)
workspace=workspaces_api_on_source_env.get_workspace(workspace_source_name),
module="Dictionary"
)
dictionary_on_target_workspace = DataGalaxyApiDictionary(
target_dictionary_api = DataGalaxyApiModules(
url=url_target,
token=token_target,
workspace=target_workspace
workspace=target_workspace,
module="Dictionary"
)

# fetching sources from workspace_source
source_sources = dictionary_on_source_workspace.list_sources(workspace_source_name)
source_all = source_sources
# fetch sources (databases) from source workspace
source_sources = source_dictionary_api.list_objects(workspace_source_name)

# fetching containers from workspace_source
source_containers = dictionary_on_source_workspace.list_containers(workspace_source_name)
source_all = source_all + source_containers
for page in source_sources:
for source in page:
# fetch children objects for each source
source_id = source['id']
print(f"source: {source["name"]} of id {source_id}")
containers = source_dictionary_api.list_children_objects(workspace_source_name, source_id, "containers")
structures = source_dictionary_api.list_children_objects(workspace_source_name, source_id, "structures")
fields = source_dictionary_api.list_children_objects(workspace_source_name, source_id, "fields")
# todo : fetch primary keys and foreign keys

# fetching structures from workspace_source
source_structures = dictionary_on_source_workspace.list_structures(workspace_source_name)
source_all = source_all + source_structures
# bulk upsert source tree
target_dictionary_api.bulk_upsert_source_tree(
workspace_name=workspace_target_name,
source=source,
objects=containers + structures + fields,
tag_value=tag_value
)

# fetching fields from workspace_source
source_fields = dictionary_on_source_workspace.list_fields(workspace_source_name)
source_all = source_all + source_fields

# copy all the dictionary in workspace_target
return dictionary_on_target_workspace.bulk_upsert_sources_tree(
workspace_name=workspace_target_name,
sources=source_all,
tag_value=tag_value
)
return 0


def copy_dictionary_parse(subparsers):
Expand Down
38 changes: 19 additions & 19 deletions toolbox/commands/delete_dictionary.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,11 @@
from toolbox.api.datagalaxy_api import DataGalaxyBulkResult
from toolbox.api.datagalaxy_api_dictionary import DataGalaxyApiDictionary
from toolbox.api.datagalaxy_api_modules import DataGalaxyApiModules
from toolbox.api.datagalaxy_api_workspaces import DataGalaxyApiWorkspace
import logging
from toolbox.api.datagalaxy_api import find_root_objects


def delete_dictionary(url: str,
token: str,
workspace_name: str) -> DataGalaxyBulkResult:
workspace_name: str) -> int:

workspaces_api = DataGalaxyApiWorkspace(
url=url,
Expand All @@ -17,24 +16,25 @@ def delete_dictionary(url: str,
if not workspace:
raise Exception(f'workspace {workspace_name} does not exist')

# on récupère les propriétés du dictionary du workspace_source
dictionary_api = DataGalaxyApiDictionary(
# fetching objects from source workspace
module_api = DataGalaxyApiModules(
url=url,
token=token,
workspace=workspace
)
sources = dictionary_api.list_sources(workspace_name)

ids = list(map(lambda object: object['id'], sources))

if ids is None or len(ids) < 1:
logging.warn("Nothing to delete in this module")
return 0

return dictionary_api.delete_sources(
workspace_name=workspace_name,
ids=ids
workspace=workspace,
module="Dictionary"
)
objects = module_api.list_objects(
workspace_name)

for page in objects:
root_objects = find_root_objects(page)
ids = list(map(lambda object: object['id'], root_objects))
module_api.delete_objects(
workspace_name=workspace_name,
ids=ids
)

return 0


def delete_dictionary_parse(subparsers):
Expand Down

0 comments on commit ec6938e

Please sign in to comment.