Skip to content

Commit

Permalink
Merge pull request #23 from JacobCallahan/resume
Browse files Browse the repository at this point in the history
Add the ability for candore to resume an extraction after an error
  • Loading branch information
jyejare authored Aug 30, 2024
2 parents 2633855 + 870326c commit 0b9908c
Show file tree
Hide file tree
Showing 10 changed files with 119 additions and 43 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

## Introduction

`Candore` is the command line interface data integrity tool. The tool is build to verify the change made in a product has any impact on data in product.
`Candore` is the command line interface data integrity tool. The tool is build to verify the change made in a product has any impact on data in product.

**The change** could be:
- Upgrade of the product to new version
Expand Down
7 changes: 5 additions & 2 deletions candore/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@
from candore.modules.extractor import Extractor
from candore.modules.finder import Finder
from candore.modules.report import Reporting
from candore.config import candore_settings


class Candore:
Expand All @@ -22,7 +21,9 @@ def __init__(self, settings):
def list_endpoints(self):
return self.api_lister.lister_endpoints()

async def save_all_entities(self, mode, output_file, full, max_pages=None, skip_percent=None):
async def save_all_entities(
self, mode, output_file, full, max_pages=None, skip_percent=None, resume=None
):
"""Save all the entities to a json file
:param mode: Pre or Post
Expand All @@ -39,6 +40,8 @@ async def save_all_entities(self, mode, output_file, full, max_pages=None, skip_
extractor.full = True
extractor.max_pages = max_pages
extractor.skip_percent = skip_percent
if resume:
extractor.load_resume_info()
data = await extractor.extract_all_entities()
if hasattr(self.settings, 'rpms'):
data.update({'installed_rpms': await extractor.extract_all_rpms()})
Expand Down
6 changes: 4 additions & 2 deletions candore/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ def candore(ctx, version, settings_file, components_file, conf_dir):
settings=candore_settings(
option_settings_file=settings_file,
option_components_file=components_file,
conf_dir=conf_dir
conf_dir=conf_dir,
)
)
ctx.__dict__["candore"] = candore_obj
Expand All @@ -49,8 +49,9 @@ def apis(ctx):
@click.option("--full", is_flag=True, help="Extract data from all the pages of a component")
@click.option("--max-pages", type=int, help="The maximum number of pages to extract per entity")
@click.option("--skip-percent", type=int, help="The percentage of pages to skip per entity")
@click.option("--resume", is_flag=True, help="Resume the extraction from the last completed entity")
@click.pass_context
def extract(ctx, mode, output, full, max_pages, skip_percent):
def extract(ctx, mode, output, full, max_pages, skip_percent, resume):
loop = asyncio.get_event_loop()
candore_obj = ctx.parent.candore
loop.run_until_complete(
Expand All @@ -60,6 +61,7 @@ def extract(ctx, mode, output, full, max_pages, skip_percent):
full=full,
max_pages=max_pages,
skip_percent=skip_percent,
resume=resume,
)
)

Expand Down
4 changes: 2 additions & 2 deletions candore/modules/comparator.py
Original file line number Diff line number Diff line change
Expand Up @@ -132,9 +132,9 @@ def custom_key(elem):
def compare_all_pres_with_posts(self, pre_data, post_data, unique_key="", var_details=None):
if unique_key:
self.big_key.append(unique_key)
if isinstance(pre_data, dict):
if isinstance(pre_data, dict) and post_data:
self._is_data_type_dict(pre_data, post_data, unique_key=unique_key)
elif isinstance(pre_data, list):
elif isinstance(pre_data, list) and post_data:
self._is_data_type_list(pre_data, post_data, unique_key=unique_key)
else:
if pre_data != post_data:
Expand Down
109 changes: 83 additions & 26 deletions candore/modules/extractor.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,19 @@
import asyncio # noqa: F401
import json
import math
from functools import cached_property
from candore.modules.ssh import Session
import re
from functools import cached_property
from pathlib import Path

import aiohttp

from candore.modules.ssh import Session

# Max observed request duration in testing was approximately 888 seconds
# so we set the timeout to 2000 seconds to be overly safe
EXTENDED_TIMEOUT = aiohttp.ClientTimeout(total=2000, connect=60, sock_read=2000, sock_connect=60)
RESUME_FILE = Path("_resume_info.json")
PARTIAL_FILE = Path("_partial_extraction.json")


class Extractor:
Expand All @@ -27,6 +33,12 @@ def __init__(self, settings, apilister=None):
self.apilister = apilister
self.full = False
self.semaphore = asyncio.Semaphore(self.settings.candore.max_connections)
self._all_data = {}
self._api_endpoints = None
self._completed_entities = []
self._current_entity = None
self._current_endpoint = None
self._retry_limit = 3

@cached_property
def dependent_components(self):
Expand All @@ -40,7 +52,9 @@ def ignore_components(self):

@cached_property
def api_endpoints(self):
return self.apilister.lister_endpoints()
if not self._api_endpoints:
self._api_endpoints = self.apilister.lister_endpoints()
return self._api_endpoints

async def _start_session(self):
if not self.client:
Expand All @@ -56,13 +70,37 @@ async def __aenter__(self):

async def __aexit__(self, exc_type, exc_val, exc_tb):
await self._end_session()
if exc_val:
with open("_partial_extraction.json", "w") as partial_file:
json.dump(self._all_data, partial_file)
with open("_resume_info.json", "w") as resume_file:
json.dump(self.to_resume_dict(), resume_file, indent=4)

async def _retry_get(self, retries=None, **get_params):
if not retries:
retries = self._retry_limit
try:
async with self.client.get(**get_params) as response:
if response.status == 200:
json_data = await response.json()
return response.status, json_data
else:
return response.status, {}
except aiohttp.ClientError:
if retries > 0:
return await self._retry_get(retries=retries - 1, **get_params)
else:
print(
f"Failed to get data from {get_params.get('url')} "
f"in {self._retry_limit} retries."
)
raise

async def paged_results(self, **get_params):
async with self.client.get(**get_params, timeout=EXTENDED_TIMEOUT) as response:
if response.status == 200:
_paged_results = await response.json()
_paged_results = _paged_results.get("results")
return _paged_results
status, _paged_results = await self._retry_get(**get_params, timeout=EXTENDED_TIMEOUT)
if status == 200:
_paged_results = _paged_results.get("results")
return _paged_results

async def fetch_page(self, page, _request):
async with self.semaphore:
Expand Down Expand Up @@ -95,18 +133,17 @@ async def fetch_component_entities(self, **comp_params):
_request = {"url": self.base + "/" + endpoint, "params": {}}
if data and dependency:
_request["params"].update({f"{dependency}_id": data})
async with self.client.get(**_request) as response:
if response.status == 200:
results = await response.json()
if "results" in results:
entity_data.extend(results.get("results"))
else:
# Return an empty directory for endpoints
# like services, api etc
# which does not have results
return entity_data
status, results = await self._retry_get(**_request)
if status == 200:
if "results" in results:
entity_data.extend(results.get("results"))
else:
# Return an empty directory for endpoints
# like services, api etc
# which does not have results
return entity_data
else:
return entity_data
total_pages = results.get("total") // results.get("per_page") + 1
if total_pages > 1:
print(f"Endpoint {endpoint} has {total_pages} pages.")
Expand Down Expand Up @@ -154,11 +191,12 @@ async def component_params(self, component_endpoint):

async def process_entities(self, endpoints):
"""
endpoints = ['katello/api/actiovationkeys']
endpoints = ['katello/api/activationkeys']
"""
comp_data = []
entities = None
for endpoint in endpoints:
self._current_endpoint = endpoint
comp_params = await self.component_params(component_endpoint=endpoint)
if comp_params:
entities = []
Expand All @@ -183,21 +221,40 @@ async def extract_all_entities(self):
:return:
"""
all_data = {}
for component, endpoints in self.api_endpoints.items():
if endpoints:
self._current_entity = component
if endpoints and component not in self._completed_entities:
comp_entities = await self.process_entities(endpoints=endpoints)
all_data[component] = comp_entities
return all_data
self._all_data[component] = comp_entities
self._completed_entities.append(component)
return self._all_data

async def extract_all_rpms(self):
"""Extracts all installed RPMs from server"""
with Session(settings=self.settings) as ssh_client:
rpms = ssh_client.execute('rpm -qa').stdout
rpms = rpms.splitlines()
name_version_pattern = rf'{self.settings.rpms.regex_pattern}'
rpms_matches = [
re.compile(name_version_pattern).match(rpm) for rpm in rpms
]
rpms_matches = [re.compile(name_version_pattern).match(rpm) for rpm in rpms]
rpms_list = [rpm_match.groups()[:-1] for rpm_match in rpms_matches if rpm_match]
return dict(rpms_list)

def to_resume_dict(self):
"""Exports our latest extraction progress information to a dictionary"""
return {
"api_endpoints": self._api_endpoints,
"completed_entities": self._completed_entities,
"current_entity": self._current_entity,
"current_endpoint": self._current_endpoint,
}

def load_resume_info(self):
"""Resumes our extraction from the last known state"""
resume_info = json.load(RESUME_FILE.read_text())
self._api_endpoints = resume_info["api_endpoints"]
self._completed_entities = resume_info["completed_entities"]
self._current_entity = resume_info["current_entity"]
self._current_endpoint = resume_info["current_endpoint"]
self._all_data = json.loads(PARTIAL_FILE.read_text())
RESUME_FILE.unlink()
PARTIAL_FILE.unlink()
18 changes: 14 additions & 4 deletions candore/modules/report.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,12 +80,22 @@ def _generate_csv_report(self, output_file, inverse):
# Convert json to csv and write to output file
csv_writer = csv.writer(output_file.open("w"))
# Table Column Names
columns = ["Path", "Pre-Upgrade", "Post-Upgrade", "Variation?" if not inverse else 'Constant?']
columns = [
"Path",
"Pre-Upgrade",
"Post-Upgrade",
"Variation?" if not inverse else 'Constant?',
]
csv_writer.writerow(columns)
# Writing Rows
for var_path, vals in self.results.items():
csv_writer.writerow([
var_path, vals["pre"], vals["post"],
vals["variation" if not inverse else "constant"]])
csv_writer.writerow(
[
var_path,
vals["pre"],
vals["post"],
vals["variation" if not inverse else "constant"],
]
)
print("Wrote CSV report to {}".format(output_file))
print("CSV report contains {} results".format(len(self.results)))
5 changes: 2 additions & 3 deletions candore/modules/ssh.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,10 @@
from hussh import Connection
from functools import cached_property
from candore.config import candore_settings
from urllib.parse import urlparse

from hussh import Connection


class Session:

def __init__(self, settings=None):
self.settings = settings
self.hostname = urlparse(settings.candore.base_url).hostname
Expand Down
5 changes: 3 additions & 2 deletions candore/modules/variations.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,9 @@
`conf/variations` yaml file and convert them into processable list
"""
from functools import cached_property
from candore.utils import yaml_reader, get_yaml_paths

from candore.utils import get_yaml_paths
from candore.utils import yaml_reader


class Variations:
Expand All @@ -20,7 +22,6 @@ def expected_variations(self):
yaml_data = self.variations.get("expected_variations") if self.variations else None
return get_yaml_paths(yaml_data=yaml_data)


@cached_property
def skipped_variations(self):
yaml_data = self.variations.get("skipped_variations") if self.variations else None
Expand Down
3 changes: 2 additions & 1 deletion candore/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
An utility helpers module
"""
from pathlib import Path

import yaml


Expand Down Expand Up @@ -40,4 +41,4 @@ def get_yaml_paths(yaml_data, prefix="", separator="/"):
paths.extend(get_yaml_paths(item, prefix, separator))
else:
paths.append(f"{prefix}{yaml_data}")
return paths
return paths
3 changes: 3 additions & 0 deletions scripts/gen_constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import yaml

KEEP_FIELDS = ["name", "label", "title", "url", "description", "path"]
SKIP_ENTITIES = ["errata", "package_groups", "repository_sets"]
SKIP_DICT = {}
HELP_TEXT = """
This script processes a comparison report, in the form of a csv file, and outputs a constants file.
Expand All @@ -26,6 +27,8 @@

def filter_parts(parts):
for check in KEEP_FIELDS:
if parts[0] in SKIP_ENTITIES:
return
if check in parts[-1]:
return True

Expand Down

0 comments on commit 0b9908c

Please sign in to comment.