Skip to content

Commit

Permalink
refactor(FindSources): get_matching_source_url
Browse files Browse the repository at this point in the history
* I dubbed the original implementation verstion_to_github_tag, but on success
  it would actually return a source url, not a GitHub tag.
  => rename in allusion to get_matching_tag() which it aims to replace
* moved tag guessing heuristic to its own method _gen_tags()
* introduced TagCache to avoid throwing the same bad guesses at the GitHub
  API over and over again. It is used transparently in _gen_tags(). This
  means it is perfectly viable for _gen_tags() to return an empty list.
* also, addressed the mypy shenanigans
  • Loading branch information
16Martin committed Nov 20, 2024
1 parent 4730fa3 commit 5259baa
Show file tree
Hide file tree
Showing 2 changed files with 139 additions and 74 deletions.
176 changes: 120 additions & 56 deletions capycli/bom/findsources.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import re
import sys
import time
from collections.abc import Iterable
from typing import Any, Dict, List, Tuple
from urllib.parse import urlparse, parse_qs

Expand All @@ -37,13 +38,66 @@
class FindSources(capycli.common.script_base.ScriptBase):
"""Go through the list of SBOM items and try to determine the source code."""

class TagCache:
"""A key task performed in this module is fetching tags from GitHub
and match tags to (component) versions. This task includes many
calls to the GitHub API, which we seek to limit by implementing
an internal cache and a logic to guess tags, instead of
performing exhaustive searches.
Generating many possibly existing tags from an actually
existing tag may result in a lot of API calls, when we try
to verify if a possibly existing tag really exists.
In order to reduce the overall API calls, we build an
internal cache to prevent verifying the same possibly
existing tag many times.
"""
def __init__(self) -> None:
self.data: dict[Tuple[str, str], set[str]] = {}

def __getitem__(self, key: Any) -> set[str]:
"""Get the set of all cached tags for a key."""
return self.data[self._validate_key(key)]

def _validate_key(self, key: Tuple[str, str]) -> Tuple[str, str]:
if len(key) != 2 or key != (str(key[0]), str(key[1])):
raise KeyError(f'{self.__class__.__name__} key must consist of'
'a project name and a version string')
return key

def add(self, project: str, version:str , tag: str) -> None:
"""Cache a tag for a specific project and version."""
key = self._validate_key((project, version))
tags = self.data.setdefault(key, set())
tags.add(tag)

def filter(self, project: str, version: str, data: Any) ->List[str]:
"""Remove all cached entries from @data."""
if isinstance(data, str):
data = [data]
elif not isinstance(data, Iterable):
raise ValueError('Expecting an interable of tags!')
key = self._validate_key((project, version))
return [item for item in data if item not in self.data.get(key,[])]

def filter_and_cache(self, project: str, version: str, data: Any
) ->List[str]:
"""Convenience method to to filtering and adding in one run."""
candidates = self.filter(project, version, data)
for tag in candidates:
self.add(project, version, tag)
return candidates


def __init__(self) -> None:
self.verbose: bool = False
self.version_regex = re.compile(r"[\d+\.|_]+[\d+]")
self.github_project_name_regex = re.compile(r"^[a-zA-Z0-9-]+(/[a-zA-Z0-9-]+)*$")
self.github_name: str = ""
self.github_token: str = ""
self.sw360_url: str = os.environ.get("SW360ServerUrl", "")
self.tag_cache = self.TagCache()

def is_sourcefile_accessible(self, sourcefile_url: str) -> bool:
"""Check if the URL is accessible."""
Expand Down Expand Up @@ -160,7 +214,7 @@ def get_github_info(repository_url: str, username: str = "",
tags.extend(tmp)
return tags

def _get_github_repo(self, github_ref):
def _get_github_repo(self, github_ref: str) -> dict[str, Any]:
"""Fetch GitHub API object identified by @github_ref.
This method interpretes @github_ref and fetches the
referenced project's API object from GitHub.
Expand Down Expand Up @@ -194,7 +248,7 @@ def _get_github_repo(self, github_ref):
raise ValueError(f"Unable to make @github_ref {github_ref} work!")
return repo

def _get_link_page(self, res, which='next'):
def _get_link_page(self, res: requests.Response, which: str='next') -> int:
"""Fetch only page number from link-header."""
try:
url = urlparse(res.links[which]['url'])
Expand All @@ -203,14 +257,59 @@ def _get_link_page(self, res, which='next'):
return 1

@staticmethod
def _trailing_zeroes(input_string):
def _trailing_zeroes(input_string: str) -> int:
"""Determine length of longest all 0 suffix in @data"""
for cnt, letter in enumerate(reversed(input_string)):
if letter != '0':
return cnt
return len(input_string) # it's all 0s

def _render_github_source_url(self, repo, ref):
def _gen_tags(self, project: str, version: str, tag: str) ->List[str]:
"""For @project, generate a new set of plausible tags.
@tag contains a tag known to exist in @project, while
@version is a semver-ish version string we want encoded
using @tag as a (sort of) template.
"""
best_guess: Any = self.version_regex.search(tag)
prefix, suffix = '', ''
if best_guess is None:
best_guess = tag
else:
best_guess = best_guess.group(0)
prefix, suffix = tag.split(best_guess, 1)

semtag = self.to_semver_string(tag).split('.')
semcmp = self.to_semver_string(version).split('.')

# reverse engineer a best_guess from @version
engineered_guess = []
remainder = best_guess
# IIRC to_semver_string() can return versions with more
# than 3 components
for tag_ver, param_ver in zip(semtag, semcmp):
chunk, remainder = remainder.split(tag_ver, 1)
leading_zeroes = self._trailing_zeroes(chunk)
delta_digits = len(param_ver) - len(tag_ver)
if leading_zeroes >= delta_digits >= 0:
chunk = chunk[:-leading_zeroes]\
+ '0' * (leading_zeroes - delta_digits)
engineered_guess.append(chunk)
engineered_guess.append(param_ver)

guesses = (
prefix + ''.join(engineered_guess) + suffix,
prefix + ''.join(engineered_guess),
'v' + ''.join(engineered_guess),
''.join(engineered_guess),
prefix + '.'.join(semcmp) + suffix,
prefix + '.'.join(semcmp),
'v' + '.'.join(semcmp),
'.'.join(semcmp),
)

return self.tag_cache.filter_and_cache(project, version, guesses)

def _render_github_source_url(self, repo: dict[str, Any], ref: str) -> str:
"""From API knowledge, create a download link.
There are quite a few indirections involved, when downloading
source code from GitHub. With this method we build what we
Expand All @@ -229,7 +328,9 @@ def _render_github_source_url(self, repo, ref):
"zipball/refs/tags", "archive/refs/tags"
) + '.zip'

def version_to_github_tag(self, version, github_ref, version_prefix=None):
def get_matching_source_url(self, version: Any, github_ref: str,
version_prefix: Any=None
) -> str:
"""Heuristics to find a tag in GitHub that corresponds to
@version in the project identified by @github_ref.
Expand Down Expand Up @@ -269,63 +370,26 @@ def version_to_github_tag(self, version, github_ref, version_prefix=None):
print_yellow(" " + str(err))
return ""

semcmp = self.to_semver_string(version).split('.')
url = repo['tags_url'] +'?per_page=100'
res = self.github_request(url, self.github_name,
self.github_token, return_response=True)
pages = self._get_link_page(res, 'last')
prefix, suffix = None, None # tag parts not relevant to versioning
for _ in range(pages): # we prefer this over "while True"
# note: in res.json() we already have the first results page
try:
tags = {tag['name']:tag for tag in res.json()}
tags = {tag['name']:tag for tag in res.json()
if version_prefix is None \
or tag['name'].startswith(version_prefix)}
except (TypeError, KeyError) as err:
# res.json() did not give us an iterable of things where
# 'name' is a viable index.
# 'name' is a viable index, for instance an error message
tags = {}

for tag in tags:
matching_tag = self.get_matching_tag([tags[tag]], version, url)
if len(matching_tag) > 0: # we found what we believe is
return matching_tag # the correct tag
# if we have seen this _prefix and _suffix,
# we already tried to guess the tag and failed
# no need to try again
best_guess = self.version_regex.search(tag)
best_guess = tag if best_guess is None else best_guess.group(0)
_prefix, _suffix = tag.split(best_guess, 1)
if prefix == _prefix and suffix == _suffix:
continue
prefix, suffix = _prefix, _suffix

# reverse engineer a best_guess from @version
engineered_guess = []
remainder = best_guess
semtag = self.to_semver_string(tag).split('.')
# IIRC to_semver_string() can return versions with more
# than 3 components
for tag_ver, param_ver in zip(semtag, semcmp):
chunk, remainder = remainder.split(tag_ver, 1)
leading_zeroes = self._trailing_zeroes(chunk)
delta_digits = len(param_ver) - len(tag_ver)
if leading_zeroes >= delta_digits >= 0:
chunk = chunk[:-leading_zeroes]\
+ '0' * min(leading_zeroes,
(leading_zeroes - delta_digits))
engineered_guess.append(chunk)
engineered_guess.append(param_ver)

guesses = { # using set to void generating duplicates
prefix + ''.join(engineered_guess) + suffix,
prefix + ''.join(engineered_guess),
'v' + ''.join(engineered_guess),
''.join(engineered_guess),
prefix + '.'.join(semcmp) + suffix,
prefix + '.'.join(semcmp),
'v' + '.'.join(semcmp),
'.'.join(semcmp),
}
for guess in guesses:
for name, api_obj in tags.items():
source_url = self.get_matching_tag([api_obj], version, url)
if len(source_url) > 0: # we found what we believe is
return source_url # the correct source_url
for guess in self._gen_tags(repo['full_name'], version, name):
if guess in tags: # found on current result-page
return self._render_github_source_url(repo, guess)
url = repo['git_refs_url'].replace('{/sha}', '{sha}', 1)
Expand Down Expand Up @@ -380,8 +444,8 @@ def find_github_url(self, component: Component, use_language: bool = True) -> st
if len(name_match):
for match in name_match:
tag_info = self.github_request(match["tags_url"], self.github_name, self.github_token)
print(f'find_github_url version_to_github_tag {component.version} {match["tags_url"]}')
new_style = self.version_to_github_tag(component.version, match["tags_url"])
print(f'find_github_url get_matching_source_url {component.version} {match["tags_url"]}')
new_style = self.get_matching_source_url(component.version, match["tags_url"])
source_url = self.get_matching_tag(tag_info, component.version or "", match["html_url"])
print(f'UPGRADE find_github_url{new_style == source_url} old({source_url}) new({new_style})')
if len(name_match) == 1:
Expand Down Expand Up @@ -450,7 +514,7 @@ def find_golang_url(self, component: Component) -> str:

if repository_name.startswith("https://github.com/"):
repository_name = repository_name[len("https://github.com/"):]
new_style = self.version_to_github_tag(component_version, repository_name)
new_style = self.get_matching_source_url(component_version, repository_name)
tag_info = self.get_github_info(repository_name, self.github_name, self.github_token)
tag_info_checked = self.check_for_github_error(tag_info)
source_url = self.get_matching_tag(tag_info_checked, component_version,
Expand All @@ -475,8 +539,8 @@ def get_github_source_url(self, github_url: str, version: str) -> str:

if self.verbose:
print_text(" repo_name:", repo_name)
print(f'get_github_source_url version_to_github_tag {version} {repo_name}')
new_style = self.version_to_github_tag(version, repo_name)
print(f'get_github_source_url get_matching_source_url {version} {repo_name}')
new_style = self.get_matching_source_url(version, repo_name)
tag_info = self.get_github_info(repo_name, self.github_name, self.github_token)
tag_info_checked = self.check_for_github_error(tag_info)
source_url = self.get_matching_tag(tag_info_checked, version, github_url)
Expand Down
Loading

0 comments on commit 5259baa

Please sign in to comment.