From 16f3918494f74394b20e3a2f3447b8d86f3d98e8 Mon Sep 17 00:00:00 2001 From: Martin Metzker <60664558+16Martin@users.noreply.github.com> Date: Wed, 13 Nov 2024 09:44:53 +0100 Subject: [PATCH 1/4] feat(findsources): new tag lookup strategy --- capycli/bom/findsources.py | 299 +++++++++++++++++++++++++++++++++++-- 1 file changed, 290 insertions(+), 9 deletions(-) diff --git a/capycli/bom/findsources.py b/capycli/bom/findsources.py index ae03e18..cef2de2 100644 --- a/capycli/bom/findsources.py +++ b/capycli/bom/findsources.py @@ -12,6 +12,7 @@ import sys import time from typing import Any, Dict, List, Tuple +from urllib.parse import urlparse, parse_qs import requests import semver @@ -40,6 +41,7 @@ def __init__(self) -> None: self.verbose: bool = False self.version_regex = re.compile(r"[\d+\.|_]+[\d+]") self.github_project_name_regex = re.compile(r"^[a-zA-Z0-9-]+(/[a-zA-Z0-9-]+)*$") + self.github_header_link_regex = re.compile(r'<([^>]*)>\s*;\s*rel\s*=\s*(("[^"]*")|(\'[^\']*\')|([^ ]*))') self.github_name: str = "" self.github_token: str = "" self.sw360_url: str = os.environ.get("SW360ServerUrl", "") @@ -70,14 +72,17 @@ def is_sourcefile_accessible(self, sourcefile_url: str) -> bool: return False @staticmethod - def github_request(url: str, username: str = "", token: str = "") -> Any: + def github_request(url: str, username: str = "", token: str = "", + return_response: bool = False, + allow_redirects: bool = True, # default in requests + ) -> Any: try: headers = {} if token: headers["Authorization"] = "token " + token if username: headers["Username"] = username - response = requests.get(url, headers=headers) + response = requests.get(url, headers=headers, allow_redirects=allow_redirects) if not response.ok: if response.status_code == 429 or \ 'rate limit exceeded' in response.reason or \ @@ -87,17 +92,16 @@ def github_request(url: str, username: str = "", token: str = "") -> Any: " Github API rate limit exceeded - wait 60s and retry ... " + Style.RESET_ALL) time.sleep(60) - return FindSources.github_request(url, username, token) - - return response.json() + return FindSources.github_request(url, username, token, return_response=return_response) except Exception as ex: print( Fore.LIGHTYELLOW_EX + " Error accessing GitHub: " + repr(ex) + Style.RESET_ALL) - - return {} + response = requests.Response() + response._content = f'{"exception": "{repr(ex)}"}'.encode() + return response if return_response else response.json() @staticmethod def get_repositories(name: str, language: str, username: str = "", token: str = "") -> Any: @@ -157,6 +161,274 @@ def get_github_info(repository_url: str, username: str = "", tags.extend(tmp) return tags + def github_api_request(self, url, **kwargs): + """Non static method to query GitHub. Also using slightly + different headers. + ToDo: not relevant to the topic at hand + """ + headers = {'Accept': 'application/vnd.github+json', + 'Authorization': f'Bearer {self.github_token}', + 'X-GitHub-Api-Version': '2022-11-28', + } + if self.github_name: + headers["Username"] = self.github_name + method = kwargs.get('method', 'GET') + handle_429 = bool(kwargs.get('handle_429', False)) + req_kwargs = {} + # extend the tuple to enable more request.request kwargs + for key in ('allow_redirects', + ): + if key in kwargs: + req_kwargs[key] = kwargs[key] + try: + res = requests.request(method, url, headers=headers, **req_kwargs) + if res.status_code == 429 and handle_429 is True\ + or 'rate limit exceeded' in res.reason \ + or "API rate limit exceeded" in res.json().get("message"): + print(f"{Fore.LIGHTYELLOW_EX}" + " Github API rate limit exceeded" + " - wait 60s and retry ... " + f"{Style.RESET_ALL}") + time.sleep(60) + return self.github_api_request(url, **kwargs) + except Exception as ex: + print( + Fore.LIGHTYELLOW_EX + + " Error accessing GitHub: " + repr(ex) + + Style.RESET_ALL) + res = requests.Response() + return res + + def _get_github_repo(self, github_ref): + """Fetch GitHub API object identified by @github_ref. + @github_ref could be a simple "/" string or any + kind of the plethora of links that refer to a project on + GitHub. This method interpretes @github_ref and fetches the + referenced project's data from GitHub. + """ + url = 'api.github.com/repos/' + gh_ref = urlparse(github_ref, scheme='no_scheme_provided') + if gh_ref.scheme == 'no_scheme_provided': + # interprete @github_ref as OWNER/REPO + url += gh_ref.path + else: + # there is an actual :// in @github_ref + if not gh_ref.netloc.endswith('github.com'): + raise ValueError(f'{github_ref} is not an expected @github_ref!') + if gh_ref.path.startswith('/repos'): + url += gh_ref.path[6:] + else: + url += gh_ref.path + if url.endswith('.git'): + url = url[0:-4] + url = url.replace('//', '/') # this is why add https:// late + url = f'https://{url}' + repo = {} + while 'tags_url' not in repo and 'github.com' in url: + repo = self.github_request(url, self.github_name, self.github_token) + url = url.rsplit('/', 1)[0] # remove last path segment + if 'tags_url' not in repo: + raise ValueError(f"Unable to make @github_ref {github_ref} work!") + return repo + + def _get_link(self, link, which='next'): + """Helper to read link-Header from GitHub API responses.""" + for match in self.github_header_link_regex.findall(link): + try: + rel = match[2] + match[3] + match[4] # two of these will be empty + if rel == which: + return match[0] + except IndexError: + print(f'_get_link unable to match! |{link}| |{which}|') + print(match) + print(self.github_header_link_regex.findall(link)) + + return None + + def _get_link_page(self, link, which='next'): + """Helper to only get page number from link-Header.""" + url = urlparse(self._get_link(link, which)) + try: + return parse_qs(url.query)['page'][0] + except KeyError: # no page in query + return 1 + + @staticmethod + def trailing_zeroes(data): + """Count length of klongest all 0 suffix in @data""" + cnt = 0 + for letter in reversed(data): + if letter == '0': + cnt += 1 + else: + break + return cnt + + def version_to_github_tag(self, version, github_ref, version_prefix=None): + """Heuristics to find a tag in GitHub that corresponds to + @version in the project identified by @github_ref. + + First we must normalize @github_ref, because we are unsure + what is actually passed as this paramter. Using urlparse() + we save ourselves a little bit of work with trailing + queries and fragments, but any @github_ref with colons where + the first colon is not part of '://' will not yield viable + results, e.g. 'api.github.com:443/repos/sw360/capycli'. + + Then we start guessing tags. + We only care about such tags that produce a non empty match + with self.version_regex, because only these would ever yield + accepted compare() results in get_matching_tag(). Every such + tag can be read as a fixed prefix, a substring as matched by + self.version_regex followed by a suffix. Usually, prefix + will be "v" and the suffix will be empty, but sometimes tags + are more elaborate. + We expect the only the regex-matchable part of a tag changes + from version to version, while the prefix and the suffix are + static. + Given a tag with a static prefix, a static suffix and a + self.version_regex-matchable substring, we can generate + tag names from semantic versions, by reverseing the logic + implemented in to_semver_string(). + Comparing the original matchable substring, to the result of + to_semver_string() we should be able to generate similar + matchable substrings from @version. + """ + matching_tag = '' + semcmp = self.to_semver_string(version).split('.') + repo = self._get_github_repo(github_ref) + print(f"version_to_github_tag: {github_ref} -> {repo['tags_url']}") + + + url = repo['tags_url'] +'?per_page=100' + res = self.github_request(url, self.github_name, + self.github_token, return_response=True) + pages = self._get_link_page(res.headers.get('link', ''), 'last') + try: + for _ in range(pages): + # note: in res.json() we already have the first page + tags = [tag['name'] for tag in res.json()] + prefix = None + suffix = None + for tag in tags: + # ex: tag['name'] = foo01_02_03bar + try: + best_guess = self.version_regex.search(tag).group(0) + except AttributeError as err: + print(f'AttributeError {err}') + print(f' tag: {tag}') + best_guess = tag + # ex => best_guess = 01_02_03 + _prefix, _suffix = tag.split(best_guess, 1) + # ex => _prefix = foo, _suffix = bar + if prefix == _prefix and suffix == _suffix: + continue + prefix, suffix = _prefix, _suffix + + semtag = self.to_semver_string(tag).split('.') + # ex => semtag = ['1', '2', '3'] + + # reverse engineer version string in tag + # ex => semcmp = ['4', '5', '6'] + ver_str_parts = [] + remainder = best_guess + # if i read that correctly to_semver_string() can return + # versions with more than 3 components + for tag_ver, param_ver in zip(semtag, semcmp): + try: + chunk, remainder = remainder.split(tag_ver, 1) + leading_zeroes = self.trailing_zeroes(chunk) + delta_digits = len(param_ver) - len(tag_ver) + if leading_zeroes > 0 \ + and leading_zeroes - delta_digits > 0: + chunk = chunk[:-leading_zeroes]\ + + '0' * (leading_zeroes - delta_digits) + ver_str_parts.append(chunk) + ver_str_parts.append(param_ver) + # ex => ver_str_parts = ['0', '4', '_0', '5', '_0', '6'] + except ValueError as err: + # sometimes there are wonky tags that are not + # even meant to be a release + # print(f'ValueError {err}') + # print(f' best_guess: {best_guess}') + # print(f' remainder: {remainder}') + # print(' ', semtag, semcmp) + # print(' ', tag_ver, param_ver) + # print(' ', ver_str_parts) + pass + + guesses = { # avoid generating duplicates + prefix + ''.join(ver_str_parts) + suffix, # ex => foo04_05_06bar + prefix + ''.join(ver_str_parts), + 'v' + ''.join(ver_str_parts), + ''.join(ver_str_parts), + prefix + '.'.join(semcmp) + suffix, + prefix + '.'.join(semcmp), + 'v' + '.'.join(semcmp), + '.'.join(semcmp), + } + for guess in guesses: + print(f'version_to_github_tag: {tag} {guess} ', end='') + if guess in tags: + print('found on current page') + matching_tag = guess + raise StopIteration() + url = repo['git_refs_url'].replace('{/sha}', '{sha}', 1) + url = url.format(sha='/tags/' + guess) + res = self.github_request(url, self.github_name, + self.github_token, return_response=True) + if res.status_code == 200: + print('is a valid tag') + matching_tag = guess + raise StopIteration() + print(':-(') + + try: + url = self._get_link(res.headers['link'], 'next') + if url is None: + raise StopIteration() + except KeyError as err: + raise StopIteration() from err + res = self.github_request(url, self.github_name, + self.github_token, return_response=True) + print('version_to_github_tag: next page!') + except StopIteration: + pass + + if matching_tag == '': + print_yellow(" No matching tag for version " + version + " found ") + return "" + + url = repo['archive_url'].replace('{/ref}', '{ref}', 1) + url = url.format(archive_format='tarball', ref='/'+matching_tag) + res = self.github_request(url, self.github_name, self.github_token, + return_response=True, allow_redirects=False) + try: + return res.headers['location'] + except KeyError: + print(f'No location in response! {url} {res.status_code}') + print(res.headers) + return "" # fail + + + + + + + + + + + + + + + + + + + + def to_semver_string(self, version: str) -> str: """Bring all version information to a format we can compare.""" result = self.version_regex.search(version) @@ -194,7 +466,10 @@ def find_github_url(self, component: Component, use_language: bool = True) -> st if len(name_match): for match in name_match: tag_info = self.github_request(match["tags_url"], self.github_name, self.github_token) + print(f'find_github_url version_to_github_tag {component.version} {match["tags_url"]}') + new_style = self.version_to_github_tag(component.version, match["tags_url"]) source_url = self.get_matching_tag(tag_info, component.version or "", match["html_url"]) + print(f'UPGRADE find_github_url{new_style == source_url} old({source_url}) new({new_style})') if len(name_match) == 1: return source_url elif source_url: @@ -261,10 +536,13 @@ def find_golang_url(self, component: Component) -> str: if repository_name.startswith("https://github.com/"): repository_name = repository_name[len("https://github.com/"):] + print(f'find_golang_url version_to_github_tag {component_version} {repository_name}') + new_style = self.version_to_github_tag(component_version, repository_name) tag_info = self.get_github_info(repository_name, self.github_name, self.github_token) tag_info_checked = self.check_for_github_error(tag_info) source_url = self.get_matching_tag(tag_info_checked, component_version, repository_name, version_prefix or "") + print(f'UPGRADE {new_style == source_url} old({source_url}) new({new_style})') # component["RepositoryUrl"] = repository_name return source_url @@ -284,10 +562,13 @@ def get_github_source_url(self, github_url: str, version: str) -> str: if self.verbose: print_text(" repo_name:", repo_name) - + print(f'get_github_source_url version_to_github_tag {version} {repo_name}') + new_style = self.version_to_github_tag(version, repo_name) tag_info = self.get_github_info(repo_name, self.github_name, self.github_token) tag_info_checked = self.check_for_github_error(tag_info) - return self.get_matching_tag(tag_info_checked, version, github_url) + source_url = self.get_matching_tag(tag_info_checked, version, github_url) + print(f'UPGRADE {new_style == source_url} old({source_url}) new({new_style})') + return source_url def check_for_github_error(self, tag_info: get_github_info_type) -> List[Dict[str, Any]]: if isinstance(tag_info, list): From 0678aca3cd7902035a8045d2c8b9d6bd9bb02a84 Mon Sep 17 00:00:00 2001 From: Martin Metzker <60664558+16Martin@users.noreply.github.com> Date: Fri, 15 Nov 2024 17:29:47 +0100 Subject: [PATCH 2/4] fixup(findSources): cleanup new implementation --- capycli/bom/findsources.py | 325 ++++++++++++++++--------------------- 1 file changed, 136 insertions(+), 189 deletions(-) diff --git a/capycli/bom/findsources.py b/capycli/bom/findsources.py index cef2de2..15edb4e 100644 --- a/capycli/bom/findsources.py +++ b/capycli/bom/findsources.py @@ -161,68 +161,33 @@ def get_github_info(repository_url: str, username: str = "", tags.extend(tmp) return tags - def github_api_request(self, url, **kwargs): - """Non static method to query GitHub. Also using slightly - different headers. - ToDo: not relevant to the topic at hand - """ - headers = {'Accept': 'application/vnd.github+json', - 'Authorization': f'Bearer {self.github_token}', - 'X-GitHub-Api-Version': '2022-11-28', - } - if self.github_name: - headers["Username"] = self.github_name - method = kwargs.get('method', 'GET') - handle_429 = bool(kwargs.get('handle_429', False)) - req_kwargs = {} - # extend the tuple to enable more request.request kwargs - for key in ('allow_redirects', - ): - if key in kwargs: - req_kwargs[key] = kwargs[key] - try: - res = requests.request(method, url, headers=headers, **req_kwargs) - if res.status_code == 429 and handle_429 is True\ - or 'rate limit exceeded' in res.reason \ - or "API rate limit exceeded" in res.json().get("message"): - print(f"{Fore.LIGHTYELLOW_EX}" - " Github API rate limit exceeded" - " - wait 60s and retry ... " - f"{Style.RESET_ALL}") - time.sleep(60) - return self.github_api_request(url, **kwargs) - except Exception as ex: - print( - Fore.LIGHTYELLOW_EX + - " Error accessing GitHub: " + repr(ex) + - Style.RESET_ALL) - res = requests.Response() - return res - def _get_github_repo(self, github_ref): """Fetch GitHub API object identified by @github_ref. + This method interpretes @github_ref and fetches the + referenced project's API object from GitHub. @github_ref could be a simple "/" string or any - kind of the plethora of links that refer to a project on - GitHub. This method interpretes @github_ref and fetches the - referenced project's data from GitHub. + from the plethora of links that refer to a project on + GitHub. + By using urlparse() we save ourselves a little bit of work + with trailing queries and fragments, but any @github_ref with + colons where the first colon is not part of '://' will not + yield viable results, + e.g. 'api.github.com:443/repos/sw360/capycli'. """ url = 'api.github.com/repos/' gh_ref = urlparse(github_ref, scheme='no_scheme_provided') if gh_ref.scheme == 'no_scheme_provided': # interprete @github_ref as OWNER/REPO url += gh_ref.path + elif not gh_ref.netloc.endswith('github.com'): + raise ValueError(f'{github_ref} is not an expected @github_ref!') + elif gh_ref.path.startswith('/repos'): + url += gh_ref.path[6:] else: - # there is an actual :// in @github_ref - if not gh_ref.netloc.endswith('github.com'): - raise ValueError(f'{github_ref} is not an expected @github_ref!') - if gh_ref.path.startswith('/repos'): - url += gh_ref.path[6:] - else: - url += gh_ref.path + url += gh_ref.path if url.endswith('.git'): url = url[0:-4] - url = url.replace('//', '/') # this is why add https:// late - url = f'https://{url}' + url = 'https://' + url.replace('//', '/') repo = {} while 'tags_url' not in repo and 'github.com' in url: repo = self.github_request(url, self.github_name, self.github_token) @@ -232,183 +197,165 @@ def _get_github_repo(self, github_ref): return repo def _get_link(self, link, which='next'): - """Helper to read link-Header from GitHub API responses.""" + """Helper to read link-Header from GitHub API responses. + Each URL in the Link-header is labelled with a relation, + usually, first, last, prev or next. We use this method to + retrieve the URL by its relation's name. + """ for match in self.github_header_link_regex.findall(link): - try: - rel = match[2] + match[3] + match[4] # two of these will be empty - if rel == which: - return match[0] - except IndexError: - print(f'_get_link unable to match! |{link}| |{which}|') - print(match) - print(self.github_header_link_regex.findall(link)) - - return None + if which in match[2:5]: + return match[0] + raise KeyError(f'No link to {which}-page in response!') def _get_link_page(self, link, which='next'): - """Helper to only get page number from link-Header.""" - url = urlparse(self._get_link(link, which)) + """Helper to only get the referenced page number a link-header-URL.""" try: + url = urlparse(self._get_link(link, which)) return parse_qs(url.query)['page'][0] - except KeyError: # no page in query + except KeyError: # GitHub gave us only one results page return 1 @staticmethod - def trailing_zeroes(data): + def _trailing_zeroes(data): """Count length of klongest all 0 suffix in @data""" - cnt = 0 - for letter in reversed(data): - if letter == '0': - cnt += 1 - else: - break - return cnt + for cnt, letter in enumerate(reversed(data)): + if letter != '0': + return cnt + return 0 + + def _render_github_source_url(self, repo, ref): + """From API knowledge, create a download link. + There are quite a few indirections involved, when downloading + source code from GitHub. With this method we build what we + think is a good download link for long term storage. + """ + url = repo['archive_url'].replace('{/ref}', '{ref}', 1) + url = url.format(archive_format='zipball', ref='/'+ref) + + res = self.github_request(url, self.github_name, self.github_token, + return_response=True, allow_redirects=False) + if res.status_code != 302: # if this is ever true, we must not assume + res.raise_for_status() # our approach is still sensible + return url.replace( + "https://api.github.com/repos", "https://github.com" + ).replace( + "zipball/refs/tags", "archive/refs/tags" + ) + '.zip' def version_to_github_tag(self, version, github_ref, version_prefix=None): """Heuristics to find a tag in GitHub that corresponds to @version in the project identified by @github_ref. First we must normalize @github_ref, because we are unsure - what is actually passed as this paramter. Using urlparse() - we save ourselves a little bit of work with trailing - queries and fragments, but any @github_ref with colons where - the first colon is not part of '://' will not yield viable - results, e.g. 'api.github.com:443/repos/sw360/capycli'. + what is actually passed as this paramter. + + The first check for each retrieved tag is the original + get_matching_tag() and all the guessing happens afterwards. + This has the effect that if our guessing does not yield any + viable results, this algo implicitly falls back to checking + every tag with get_matching_tag(), which is the behaviour of + the previous implementation. - Then we start guessing tags. + If get_matching_tag() did not yield a positive result, we + start guessing tags: We only care about such tags that produce a non empty match with self.version_regex, because only these would ever yield - accepted compare() results in get_matching_tag(). Every such - tag can be read as a fixed prefix, a substring as matched by - self.version_regex followed by a suffix. Usually, prefix - will be "v" and the suffix will be empty, but sometimes tags - are more elaborate. - We expect the only the regex-matchable part of a tag changes + accepted compare() results in get_matching_tag(). + Every such tag can be read as a fixed prefix, followed by a + substring as matched by self.version_regex, followed by a + fixed suffix. Usually, the prefix will be "v" and the suffix + will be empty, but sometimes tags are more elaborate. + We expect only the regex-matchable part of a tag changes from version to version, while the prefix and the suffix are static. Given a tag with a static prefix, a static suffix and a self.version_regex-matchable substring, we can generate - tag names from semantic versions, by reverseing the logic - implemented in to_semver_string(). + tag names from semantic versions, by reversing the logic + in to_semver_string(). Comparing the original matchable substring, to the result of to_semver_string() we should be able to generate similar matchable substrings from @version. """ - matching_tag = '' semcmp = self.to_semver_string(version).split('.') repo = self._get_github_repo(github_ref) print(f"version_to_github_tag: {github_ref} -> {repo['tags_url']}") - url = repo['tags_url'] +'?per_page=100' res = self.github_request(url, self.github_name, self.github_token, return_response=True) pages = self._get_link_page(res.headers.get('link', ''), 'last') - try: - for _ in range(pages): - # note: in res.json() we already have the first page - tags = [tag['name'] for tag in res.json()] - prefix = None - suffix = None - for tag in tags: - # ex: tag['name'] = foo01_02_03bar + prefix, suffix = None, None # tag parts not relevant to versioning + for _ in range(pages): # we prefer this over "while True" + # note: in res.json() we already have the first results page + tags = {tag['name']:tag for tag in res.json()} + for tag in tags: + matching_tag = self.get_matching_tag([tags[tag]], version, url) + if len(matching_tag) > 0: # we found what we believe is + return matching_tag # the correct tag + # if we have seen this _prefix and _suffix, + # we already tried to guess the tag and failed + # no need to try again + best_guess = self.version_regex.search(tag) + best_guess = tag if best_guess is None else best_guess.group(0) + _prefix, _suffix = tag.split(best_guess, 1) + if prefix == _prefix and suffix == _suffix: + continue + prefix, suffix = _prefix, _suffix + + # reverse engineer a best_guess from @version + engineered_guess = [] + remainder = best_guess + semtag = self.to_semver_string(tag).split('.') + # IIRC to_semver_string() can return versions with more + # than 3 components + for tag_ver, param_ver in zip(semtag, semcmp): try: - best_guess = self.version_regex.search(tag).group(0) - except AttributeError as err: - print(f'AttributeError {err}') - print(f' tag: {tag}') - best_guess = tag - # ex => best_guess = 01_02_03 - _prefix, _suffix = tag.split(best_guess, 1) - # ex => _prefix = foo, _suffix = bar - if prefix == _prefix and suffix == _suffix: - continue - prefix, suffix = _prefix, _suffix - - semtag = self.to_semver_string(tag).split('.') - # ex => semtag = ['1', '2', '3'] - - # reverse engineer version string in tag - # ex => semcmp = ['4', '5', '6'] - ver_str_parts = [] - remainder = best_guess - # if i read that correctly to_semver_string() can return - # versions with more than 3 components - for tag_ver, param_ver in zip(semtag, semcmp): - try: - chunk, remainder = remainder.split(tag_ver, 1) - leading_zeroes = self.trailing_zeroes(chunk) - delta_digits = len(param_ver) - len(tag_ver) - if leading_zeroes > 0 \ - and leading_zeroes - delta_digits > 0: - chunk = chunk[:-leading_zeroes]\ - + '0' * (leading_zeroes - delta_digits) - ver_str_parts.append(chunk) - ver_str_parts.append(param_ver) - # ex => ver_str_parts = ['0', '4', '_0', '5', '_0', '6'] - except ValueError as err: - # sometimes there are wonky tags that are not - # even meant to be a release - # print(f'ValueError {err}') - # print(f' best_guess: {best_guess}') - # print(f' remainder: {remainder}') - # print(' ', semtag, semcmp) - # print(' ', tag_ver, param_ver) - # print(' ', ver_str_parts) - pass - - guesses = { # avoid generating duplicates - prefix + ''.join(ver_str_parts) + suffix, # ex => foo04_05_06bar - prefix + ''.join(ver_str_parts), - 'v' + ''.join(ver_str_parts), - ''.join(ver_str_parts), - prefix + '.'.join(semcmp) + suffix, - prefix + '.'.join(semcmp), - 'v' + '.'.join(semcmp), - '.'.join(semcmp), - } - for guess in guesses: - print(f'version_to_github_tag: {tag} {guess} ', end='') - if guess in tags: - print('found on current page') - matching_tag = guess - raise StopIteration() - url = repo['git_refs_url'].replace('{/sha}', '{sha}', 1) - url = url.format(sha='/tags/' + guess) - res = self.github_request(url, self.github_name, - self.github_token, return_response=True) - if res.status_code == 200: - print('is a valid tag') - matching_tag = guess - raise StopIteration() - print(':-(') - - try: - url = self._get_link(res.headers['link'], 'next') - if url is None: - raise StopIteration() - except KeyError as err: - raise StopIteration() from err + chunk, remainder = remainder.split(tag_ver, 1) + leading_zeroes = self._trailing_zeroes(chunk) + delta_digits = len(param_ver) - len(tag_ver) + if leading_zeroes > 0 \ + and leading_zeroes - delta_digits > 0: + chunk = chunk[:-leading_zeroes]\ + + '0' * (leading_zeroes - delta_digits) + engineered_guess.append(chunk) + engineered_guess.append(param_ver) + except ValueError as err: + # sometimes there are wonky tags that are not + # even meant to be a release + pass + + guesses = { # set to void generating duplicates + prefix + ''.join(engineered_guess) + suffix, + prefix + ''.join(engineered_guess), + 'v' + ''.join(engineered_guess), + ''.join(engineered_guess), + prefix + '.'.join(semcmp) + suffix, + prefix + '.'.join(semcmp), + 'v' + '.'.join(semcmp), + '.'.join(semcmp), + } + for guess in guesses: + print(f'version_to_github_tag: {tag} {guess} ', end='') + if guess in tags: + print('found on current page') + return self._render_github_source_url(repo, guess) + url = repo['git_refs_url'].replace('{/sha}', '{sha}', 1) + url = url.format(sha='/tags/' + guess) + res = self.github_request(url, self.github_name, + self.github_token, return_response=True) + if res.status_code == 200: + print('is a valid tag') + return self._render_github_source_url(repo, guess) + print(':-(') + try: + url = self._get_link(res.headers['link'], 'next') res = self.github_request(url, self.github_name, self.github_token, return_response=True) print('version_to_github_tag: next page!') - except StopIteration: - pass - - if matching_tag == '': - print_yellow(" No matching tag for version " + version + " found ") - return "" - - url = repo['archive_url'].replace('{/ref}', '{ref}', 1) - url = url.format(archive_format='tarball', ref='/'+matching_tag) - res = self.github_request(url, self.github_name, self.github_token, - return_response=True, allow_redirects=False) - try: - return res.headers['location'] - except KeyError: - print(f'No location in response! {url} {res.status_code}') - print(res.headers) - return "" # fail + except KeyError as err: # no more result pages + break + print_yellow(" No matching tag for version " + version + " found") + return "" From 0883f410321efee10364e2489b0ad4401cfef276 Mon Sep 17 00:00:00 2001 From: Martin Metzker <60664558+16Martin@users.noreply.github.com> Date: Sat, 16 Nov 2024 13:06:16 +0100 Subject: [PATCH 3/4] fixup: use requests link property --- capycli/bom/findsources.py | 22 +++++++--------------- 1 file changed, 7 insertions(+), 15 deletions(-) diff --git a/capycli/bom/findsources.py b/capycli/bom/findsources.py index 15edb4e..c5d8b59 100644 --- a/capycli/bom/findsources.py +++ b/capycli/bom/findsources.py @@ -190,27 +190,19 @@ def _get_github_repo(self, github_ref): url = 'https://' + url.replace('//', '/') repo = {} while 'tags_url' not in repo and 'github.com' in url: + print('DEBUG running github_request') + print(f'DEBUG self.github_request({url}, {self.github_name}, {self.github_token})') repo = self.github_request(url, self.github_name, self.github_token) + print(f'DEBUG repo {repo}') url = url.rsplit('/', 1)[0] # remove last path segment if 'tags_url' not in repo: raise ValueError(f"Unable to make @github_ref {github_ref} work!") return repo - def _get_link(self, link, which='next'): - """Helper to read link-Header from GitHub API responses. - Each URL in the Link-header is labelled with a relation, - usually, first, last, prev or next. We use this method to - retrieve the URL by its relation's name. - """ - for match in self.github_header_link_regex.findall(link): - if which in match[2:5]: - return match[0] - raise KeyError(f'No link to {which}-page in response!') - - def _get_link_page(self, link, which='next'): + def _get_link_page(self, res, which='next'): """Helper to only get the referenced page number a link-header-URL.""" try: - url = urlparse(self._get_link(link, which)) + url = urlparse(res.links[which]['url']) return parse_qs(url.query)['page'][0] except KeyError: # GitHub gave us only one results page return 1 @@ -283,7 +275,7 @@ def version_to_github_tag(self, version, github_ref, version_prefix=None): url = repo['tags_url'] +'?per_page=100' res = self.github_request(url, self.github_name, self.github_token, return_response=True) - pages = self._get_link_page(res.headers.get('link', ''), 'last') + pages = self._get_link_page(res) prefix, suffix = None, None # tag parts not relevant to versioning for _ in range(pages): # we prefer this over "while True" # note: in res.json() we already have the first results page @@ -348,7 +340,7 @@ def version_to_github_tag(self, version, github_ref, version_prefix=None): return self._render_github_source_url(repo, guess) print(':-(') try: - url = self._get_link(res.headers['link'], 'next') + url = res.links['next']['url'] res = self.github_request(url, self.github_name, self.github_token, return_response=True) print('version_to_github_tag: next page!') From 72e1fb47516fbcc9417ac4f0901c8701d09e6f34 Mon Sep 17 00:00:00 2001 From: Martin Metzker <60664558+16Martin@users.noreply.github.com> Date: Sat, 16 Nov 2024 13:06:46 +0100 Subject: [PATCH 4/4] fix: make unittests work --- tests/test_find_sources.py | 18 +++++++++++++++++- 1 file changed, 17 insertions(+), 1 deletion(-) diff --git a/tests/test_find_sources.py b/tests/test_find_sources.py index 40a1cbb..4c48921 100644 --- a/tests/test_find_sources.py +++ b/tests/test_find_sources.py @@ -6,10 +6,12 @@ # SPDX-License-Identifier: MIT # ------------------------------------------------------------------------------- +import json import os from typing import Any, Dict, List from unittest.mock import MagicMock, patch +import requests import responses import capycli.common.json_support @@ -87,6 +89,7 @@ def test_file_invalid(self) -> None: def mock_github_request_side_effect(self, url: str, username: str = "", token: str = "") -> Any: # Define different mock responses based on the URL + print(f'\nmock_github_request({url})') if url == 'https://api.github.com/repos/tartley/colorama/tags?per_page=100&page=1': return [ { @@ -328,13 +331,26 @@ def test_get_pkg_go_repo_url_error(self, mock_requests_get: Any) -> None: @patch('capycli.bom.findsources.FindSources.get_matching_tag') def test_find_golang_url_github(self, mock_get_github_info: Any, mock_get_matching_tag: Any) -> None: # Mocking a GitHub scenario + tags_first_page = requests.Response() + tags_first_page._content = json.dumps( + [{'name': f'unittest_v1.0.1_unittest'}] + ).encode() mock_get_github_info.return_value = 'https://pkg.go.dev/github.com/opencontainers/runc' mock_get_matching_tag.return_value = 'https://github.com/opencontainers/runc/archive/refs/tags/v1.0.1.zip' + mock_github_request.side_effect = ( + {'tags_url': 'https://api.github.com/repos/opencontainers/runc/tags'}, + tags_first_page, + ) find_sources = FindSources() component = MagicMock() component.name = 'github.com/opencontainers/runc' component.version = 'v1.0.1' - source_url = find_sources.find_golang_url(component) + with patch.object(find_sources, 'github_request') as call: + call.side_effect = ( + {'tags_url': 'https://api.github.com/repos/opencontainers/runc/tags'}, + tags_first_page, + ) + source_url = find_sources.find_golang_url(component) self.assertEqual(source_url, 'https://pkg.go.dev/github.com/opencontainers/runc')