diff --git a/capycli/bom/findsources.py b/capycli/bom/findsources.py index 8d49222..099e85c 100644 --- a/capycli/bom/findsources.py +++ b/capycli/bom/findsources.py @@ -11,6 +11,7 @@ import re import sys import time +from collections.abc import Iterable from typing import Any, Dict, List, Tuple from urllib.parse import urlparse, parse_qs @@ -37,6 +38,58 @@ class FindSources(capycli.common.script_base.ScriptBase): """Go through the list of SBOM items and try to determine the source code.""" + class TagCache: + """A key task performed in this module is fetching tags from GitHub + and match tags to (component) versions. This task includes many + calls to the GitHub API, which we seek to limit by implementing + an internal cache and a logic to guess tags, instead of + performing exhaustive searches. + + Generating many possibly existing tags from an actually + existing tag may result in a lot of API calls, when we try + to verify if a possibly existing tag really exists. + + In order to reduce the overall API calls, we build an + internal cache to prevent verifying the same possibly + existing tag many times. + """ + def __init__(self) -> None: + self.data: dict[Tuple[str, str], set[str]] = {} + + def __getitem__(self, key: Any) -> set[str]: + """Get the set of all cached tags for a key.""" + return self.data[self._validate_key(key)] + + def _validate_key(self, key: Tuple[str, str]) -> Tuple[str, str]: + if len(key) != 2 or key != (str(key[0]), str(key[1])): + raise KeyError(f'{self.__class__.__name__} key must consist of' + 'a project name and a version string') + return key + + def add(self, project: str, version:str , tag: str) -> None: + """Cache a tag for a specific project and version.""" + key = self._validate_key((project, version)) + tags = self.data.setdefault(key, set()) + tags.add(tag) + + def filter(self, project: str, version: str, data: Any) ->List[str]: + """Remove all cached entries from @data.""" + if isinstance(data, str): + data = [data] + elif not isinstance(data, Iterable): + raise ValueError('Expecting an interable of tags!') + key = self._validate_key((project, version)) + return [item for item in data if item not in self.data.get(key,[])] + + def filter_and_cache(self, project: str, version: str, data: Any + ) ->List[str]: + """Convenience method to to filtering and adding in one run.""" + candidates = self.filter(project, version, data) + for tag in candidates: + self.add(project, version, tag) + return candidates + + def __init__(self) -> None: self.verbose: bool = False self.version_regex = re.compile(r"[\d+\.|_]+[\d+]") @@ -44,6 +97,7 @@ def __init__(self) -> None: self.github_name: str = "" self.github_token: str = "" self.sw360_url: str = os.environ.get("SW360ServerUrl", "") + self.tag_cache = self.TagCache() def is_sourcefile_accessible(self, sourcefile_url: str) -> bool: """Check if the URL is accessible.""" @@ -160,7 +214,7 @@ def get_github_info(repository_url: str, username: str = "", tags.extend(tmp) return tags - def _get_github_repo(self, github_ref): + def _get_github_repo(self, github_ref: str) -> dict[str, Any]: """Fetch GitHub API object identified by @github_ref. This method interpretes @github_ref and fetches the referenced project's API object from GitHub. @@ -194,7 +248,7 @@ def _get_github_repo(self, github_ref): raise ValueError(f"Unable to make @github_ref {github_ref} work!") return repo - def _get_link_page(self, res, which='next'): + def _get_link_page(self, res: requests.Response, which: str='next') -> int: """Fetch only page number from link-header.""" try: url = urlparse(res.links[which]['url']) @@ -203,14 +257,59 @@ def _get_link_page(self, res, which='next'): return 1 @staticmethod - def _trailing_zeroes(input_string): + def _trailing_zeroes(input_string: str) -> int: """Determine length of longest all 0 suffix in @data""" for cnt, letter in enumerate(reversed(input_string)): if letter != '0': return cnt return len(input_string) # it's all 0s - def _render_github_source_url(self, repo, ref): + def _gen_tags(self, project: str, version: str, tag: str) ->List[str]: + """For @project, generate a new set of plausible tags. + @tag contains a tag known to exist in @project, while + @version is a semver-ish version string we want encoded + using @tag as a (sort of) template. + """ + best_guess: Any = self.version_regex.search(tag) + prefix, suffix = '', '' + if best_guess is None: + best_guess = tag + else: + best_guess = best_guess.group(0) + prefix, suffix = tag.split(best_guess, 1) + + semtag = self.to_semver_string(tag).split('.') + semcmp = self.to_semver_string(version).split('.') + + # reverse engineer a best_guess from @version + engineered_guess = [] + remainder = best_guess + # IIRC to_semver_string() can return versions with more + # than 3 components + for tag_ver, param_ver in zip(semtag, semcmp): + chunk, remainder = remainder.split(tag_ver, 1) + leading_zeroes = self._trailing_zeroes(chunk) + delta_digits = len(param_ver) - len(tag_ver) + if leading_zeroes >= delta_digits >= 0: + chunk = chunk[:-leading_zeroes]\ + + '0' * (leading_zeroes - delta_digits) + engineered_guess.append(chunk) + engineered_guess.append(param_ver) + + guesses = ( + prefix + ''.join(engineered_guess) + suffix, + prefix + ''.join(engineered_guess), + 'v' + ''.join(engineered_guess), + ''.join(engineered_guess), + prefix + '.'.join(semcmp) + suffix, + prefix + '.'.join(semcmp), + 'v' + '.'.join(semcmp), + '.'.join(semcmp), + ) + + return self.tag_cache.filter_and_cache(project, version, guesses) + + def _render_github_source_url(self, repo: dict[str, Any], ref: str) -> str: """From API knowledge, create a download link. There are quite a few indirections involved, when downloading source code from GitHub. With this method we build what we @@ -229,7 +328,9 @@ def _render_github_source_url(self, repo, ref): "zipball/refs/tags", "archive/refs/tags" ) + '.zip' - def version_to_github_tag(self, version, github_ref, version_prefix=None): + def get_matching_source_url(self, version: Any, github_ref: str, + version_prefix: Any=None + ) -> str: """Heuristics to find a tag in GitHub that corresponds to @version in the project identified by @github_ref. @@ -269,63 +370,26 @@ def version_to_github_tag(self, version, github_ref, version_prefix=None): print_yellow(" " + str(err)) return "" - semcmp = self.to_semver_string(version).split('.') url = repo['tags_url'] +'?per_page=100' res = self.github_request(url, self.github_name, self.github_token, return_response=True) pages = self._get_link_page(res, 'last') - prefix, suffix = None, None # tag parts not relevant to versioning for _ in range(pages): # we prefer this over "while True" # note: in res.json() we already have the first results page try: - tags = {tag['name']:tag for tag in res.json()} + tags = {tag['name']:tag for tag in res.json() + if version_prefix is None \ + or tag['name'].startswith(version_prefix)} except (TypeError, KeyError) as err: # res.json() did not give us an iterable of things where - # 'name' is a viable index. + # 'name' is a viable index, for instance an error message tags = {} - for tag in tags: - matching_tag = self.get_matching_tag([tags[tag]], version, url) - if len(matching_tag) > 0: # we found what we believe is - return matching_tag # the correct tag - # if we have seen this _prefix and _suffix, - # we already tried to guess the tag and failed - # no need to try again - best_guess = self.version_regex.search(tag) - best_guess = tag if best_guess is None else best_guess.group(0) - _prefix, _suffix = tag.split(best_guess, 1) - if prefix == _prefix and suffix == _suffix: - continue - prefix, suffix = _prefix, _suffix - - # reverse engineer a best_guess from @version - engineered_guess = [] - remainder = best_guess - semtag = self.to_semver_string(tag).split('.') - # IIRC to_semver_string() can return versions with more - # than 3 components - for tag_ver, param_ver in zip(semtag, semcmp): - chunk, remainder = remainder.split(tag_ver, 1) - leading_zeroes = self._trailing_zeroes(chunk) - delta_digits = len(param_ver) - len(tag_ver) - if leading_zeroes >= delta_digits >= 0: - chunk = chunk[:-leading_zeroes]\ - + '0' * min(leading_zeroes, - (leading_zeroes - delta_digits)) - engineered_guess.append(chunk) - engineered_guess.append(param_ver) - - guesses = { # using set to void generating duplicates - prefix + ''.join(engineered_guess) + suffix, - prefix + ''.join(engineered_guess), - 'v' + ''.join(engineered_guess), - ''.join(engineered_guess), - prefix + '.'.join(semcmp) + suffix, - prefix + '.'.join(semcmp), - 'v' + '.'.join(semcmp), - '.'.join(semcmp), - } - for guess in guesses: + for name, api_obj in tags.items(): + source_url = self.get_matching_tag([api_obj], version, url) + if len(source_url) > 0: # we found what we believe is + return source_url # the correct source_url + for guess in self._gen_tags(repo['full_name'], version, name): if guess in tags: # found on current result-page return self._render_github_source_url(repo, guess) url = repo['git_refs_url'].replace('{/sha}', '{sha}', 1) @@ -380,8 +444,8 @@ def find_github_url(self, component: Component, use_language: bool = True) -> st if len(name_match): for match in name_match: tag_info = self.github_request(match["tags_url"], self.github_name, self.github_token) - print(f'find_github_url version_to_github_tag {component.version} {match["tags_url"]}') - new_style = self.version_to_github_tag(component.version, match["tags_url"]) + print(f'find_github_url get_matching_source_url {component.version} {match["tags_url"]}') + new_style = self.get_matching_source_url(component.version, match["tags_url"]) source_url = self.get_matching_tag(tag_info, component.version or "", match["html_url"]) print(f'UPGRADE find_github_url{new_style == source_url} old({source_url}) new({new_style})') if len(name_match) == 1: @@ -450,7 +514,7 @@ def find_golang_url(self, component: Component) -> str: if repository_name.startswith("https://github.com/"): repository_name = repository_name[len("https://github.com/"):] - new_style = self.version_to_github_tag(component_version, repository_name) + new_style = self.get_matching_source_url(component_version, repository_name) tag_info = self.get_github_info(repository_name, self.github_name, self.github_token) tag_info_checked = self.check_for_github_error(tag_info) source_url = self.get_matching_tag(tag_info_checked, component_version, @@ -475,8 +539,8 @@ def get_github_source_url(self, github_url: str, version: str) -> str: if self.verbose: print_text(" repo_name:", repo_name) - print(f'get_github_source_url version_to_github_tag {version} {repo_name}') - new_style = self.version_to_github_tag(version, repo_name) + print(f'get_github_source_url get_matching_source_url {version} {repo_name}') + new_style = self.get_matching_source_url(version, repo_name) tag_info = self.get_github_info(repo_name, self.github_name, self.github_token) tag_info_checked = self.check_for_github_error(tag_info) source_url = self.get_matching_tag(tag_info_checked, version, github_url) diff --git a/tests/test_find_sources.py b/tests/test_find_sources.py index 6cc4f9c..cf66f96 100644 --- a/tests/test_find_sources.py +++ b/tests/test_find_sources.py @@ -57,6 +57,7 @@ def git_refs_url(self): def api_object(self): """Limited to the values we access ...""" return { + 'full_name': self.name, 'tags_url': self.tags_url, 'archive_url': self.archive_url, 'git_refs_url': self.git_refs_url, @@ -399,7 +400,7 @@ def test_get_pkg_go_repo_url_error(self, mock_requests_get: Any) -> None: repo_url = find_sources.get_pkg_go_repo_url('some/package') self.assertEqual(repo_url, 'https://pkg.go.dev/some/package') - @patch('capycli.bom.findsources.FindSources.version_to_github_tag') + @patch('capycli.bom.findsources.FindSources.get_matching_source_url') @patch('capycli.bom.findsources.FindSources.get_pkg_go_repo_url') @patch('capycli.bom.findsources.FindSources.get_matching_tag') @patch('capycli.bom.findsources.FindSources.get_github_info') @@ -407,7 +408,7 @@ def test_find_golang_url_github(self, mock_get_github_info: Any, mock_get_matching_tag: Any, mock_get_pkg_go_repo_url: Any, - mock_version_to_github_tag: Any, + mock_get_matching_source_url: Any, ) -> None: # Mocking a GitHub scenario runc = { # real data as of 2024-11-18 @@ -417,7 +418,7 @@ def test_find_golang_url_github(self, mock_get_github_info.return_value = [] mock_get_matching_tag.return_value = runc['zipball_url'] mock_get_pkg_go_repo_url.return_value = runc['html_url'] - mock_version_to_github_tag.return_value = runc['zipball_url'] + mock_get_matching_source_url.return_value = runc['zipball_url'] find_sources = FindSources() component = MagicMock() component.name = 'github.com/opencontainers/runc' @@ -537,20 +538,20 @@ def test_get_source_url_exception(self) -> None: @patch('capycli.bom.findsources.FindSources.get_pkg_go_repo_url') @patch('capycli.bom.findsources.FindSources.github_request') - def test_version_to_github_tag(self, + def test_get_matching_source_url(self, mock_github_request: Any, mock_get_pkg_go_repo_url: Any, ) -> None: - """various version_to_github_tag() invocations. + """various get_matching_source_url() invocations. from find_github_url - self.version_to_github_tag(component.version, match["tags_url"]) + self.get_matching_source_url(component.version, match["tags_url"]) -> run tests with tags_url from find_golang_url (with lengthy parameter preparation) - self.version_to_github_tag(component_version, repository_name) + self.get_matching_source_url(component_version, repository_name) -> run tests with project.name from get_github_source_url (w/ repo_name from get_repo_name()) - self.version_to_github_tag(version, repo_name) + self.get_matching_source_url(version, repo_name) """ out = FindSources() # Object Under Test mock_github_request.side_effect = self.mock_github_request_side_effect @@ -561,10 +562,10 @@ def test_version_to_github_tag(self, tag_name = project.data[0]['name'] except (IndexError, KeyError, TypeError): # expect negative results - res = out.version_to_github_tag(tag_name, project.tags_url) + res = out.get_matching_source_url(tag_name, project.tags_url) self.assertEqual(res, '') else: - res = out.version_to_github_tag(tag_name, project.tags_url) + res = out.get_matching_source_url(tag_name, project.tags_url) # assertions based on _render_github_source_url() self.assertTrue(res.startswith('https://github.com')) self.assertIn('archive/refs/tags', res) @@ -590,9 +591,9 @@ def test_version_to_github_tag(self, forged_entry[key] = value project.data.append(forged_entry) # most importantly, the entry with the forged tag is not the - # first entry and therefore version_to_github_tag will try + # first entry and therefore get_matching_source_url will try # at least once to guess the forged_tag - res = out.version_to_github_tag(forged_tag, project.tags_url) + res = out.get_matching_source_url(forged_tag, project.tags_url) # assertions based on _render_github_source_url() self.assertTrue(res.startswith('https://github.com')) self.assertIn('archive/refs/tags', res) @@ -605,10 +606,10 @@ def test_version_to_github_tag(self, tag_name = project.data[0]['name'] except (IndexError, KeyError, TypeError): # expect negative results - res = out.version_to_github_tag(tag_name, project.name) + res = out.get_matching_source_url(tag_name, project.name) self.assertEqual(res, '') else: - res = out.version_to_github_tag(tag_name, project.name) + res = out.get_matching_source_url(tag_name, project.name) # assertions based on _render_github_source_url() self.assertTrue(res.startswith('https://github.com')) self.assertIn('archive/refs/tags', res) @@ -619,10 +620,10 @@ def test_version_to_github_tag(self, tag_name = project.data[0]['name'] except (IndexError, KeyError, TypeError): # expect negative results - res = out.version_to_github_tag(tag_name, project.git_url) + res = out.get_matching_source_url(tag_name, project.git_url) self.assertEqual(res, '') else: - res = out.version_to_github_tag(tag_name, project.name) + res = out.get_matching_source_url(tag_name, project.name) # assertions based on _render_github_source_url() self.assertTrue(res.startswith('https://github.com')) self.assertIn('archive/refs/tags', res) @@ -642,11 +643,11 @@ def test_version_to_github_tag(self, forged_tag = tag_name[0:pos] \ + str(int(split_semver[1]) + 2 * len(project.data)) \ + tag_name[pos+len(split_semver[1]):] - res = out.version_to_github_tag(forged_tag, project.tags_url) + res = out.get_matching_source_url(forged_tag, project.tags_url) self.assertEqual(res, '') # not sure if instead of testing this we should remove the code - res = out.version_to_github_tag('0.0', 'https://gitlab.com/unit/test') + res = out.get_matching_source_url('0.0', 'https://gitlab.com/unit/test') self.assertEqual(res, '')