Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft: FindSources w/o always fetching all tags #102

Closed
wants to merge 4 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
238 changes: 229 additions & 9 deletions capycli/bom/findsources.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import sys
import time
from typing import Any, Dict, List, Tuple
from urllib.parse import urlparse, parse_qs

import requests
import semver
Expand Down Expand Up @@ -40,6 +41,7 @@ def __init__(self) -> None:
self.verbose: bool = False
self.version_regex = re.compile(r"[\d+\.|_]+[\d+]")
self.github_project_name_regex = re.compile(r"^[a-zA-Z0-9-]+(/[a-zA-Z0-9-]+)*$")
self.github_header_link_regex = re.compile(r'<([^>]*)>\s*;\s*rel\s*=\s*(("[^"]*")|(\'[^\']*\')|([^ ]*))')
self.github_name: str = ""
self.github_token: str = ""
self.sw360_url: str = os.environ.get("SW360ServerUrl", "")
Expand Down Expand Up @@ -70,14 +72,17 @@ def is_sourcefile_accessible(self, sourcefile_url: str) -> bool:
return False

@staticmethod
def github_request(url: str, username: str = "", token: str = "") -> Any:
def github_request(url: str, username: str = "", token: str = "",
return_response: bool = False,
allow_redirects: bool = True, # default in requests
) -> Any:
try:
headers = {}
if token:
headers["Authorization"] = "token " + token
if username:
headers["Username"] = username
response = requests.get(url, headers=headers)
response = requests.get(url, headers=headers, allow_redirects=allow_redirects)
if not response.ok:
if response.status_code == 429 or \
'rate limit exceeded' in response.reason or \
Expand All @@ -87,17 +92,16 @@ def github_request(url: str, username: str = "", token: str = "") -> Any:
" Github API rate limit exceeded - wait 60s and retry ... " +
Style.RESET_ALL)
time.sleep(60)
return FindSources.github_request(url, username, token)

return response.json()
return FindSources.github_request(url, username, token, return_response=return_response)

except Exception as ex:
print(
Fore.LIGHTYELLOW_EX +
" Error accessing GitHub: " + repr(ex) +
Style.RESET_ALL)

return {}
response = requests.Response()
response._content = f'{"exception": "{repr(ex)}"}'.encode()
return response if return_response else response.json()

@staticmethod
def get_repositories(name: str, language: str, username: str = "", token: str = "") -> Any:
Expand Down Expand Up @@ -157,6 +161,213 @@ def get_github_info(repository_url: str, username: str = "",
tags.extend(tmp)
return tags

def _get_github_repo(self, github_ref):
"""Fetch GitHub API object identified by @github_ref.
This method interpretes @github_ref and fetches the
referenced project's API object from GitHub.
@github_ref could be a simple "<owner>/<repo>" string or any
from the plethora of links that refer to a project on
GitHub.
By using urlparse() we save ourselves a little bit of work
with trailing queries and fragments, but any @github_ref with
colons where the first colon is not part of '://' will not
yield viable results,
e.g. 'api.github.com:443/repos/sw360/capycli'.
"""
url = 'api.github.com/repos/'
gh_ref = urlparse(github_ref, scheme='no_scheme_provided')
if gh_ref.scheme == 'no_scheme_provided':
# interprete @github_ref as OWNER/REPO
url += gh_ref.path
elif not gh_ref.netloc.endswith('github.com'):
raise ValueError(f'{github_ref} is not an expected @github_ref!')
elif gh_ref.path.startswith('/repos'):
url += gh_ref.path[6:]
else:
url += gh_ref.path
if url.endswith('.git'):
url = url[0:-4]
url = 'https://' + url.replace('//', '/')
repo = {}
while 'tags_url' not in repo and 'github.com' in url:
print('DEBUG running github_request')
print(f'DEBUG self.github_request({url}, {self.github_name}, {self.github_token})')
repo = self.github_request(url, self.github_name, self.github_token)
print(f'DEBUG repo {repo}')
url = url.rsplit('/', 1)[0] # remove last path segment
if 'tags_url' not in repo:
raise ValueError(f"Unable to make @github_ref {github_ref} work!")
return repo

def _get_link_page(self, res, which='next'):
"""Helper to only get the referenced page number a link-header-URL."""
try:
url = urlparse(res.links[which]['url'])
return parse_qs(url.query)['page'][0]
except KeyError: # GitHub gave us only one results page
return 1

@staticmethod
def _trailing_zeroes(data):
"""Count length of klongest all 0 suffix in @data"""
for cnt, letter in enumerate(reversed(data)):
if letter != '0':
return cnt
return 0

def _render_github_source_url(self, repo, ref):
"""From API knowledge, create a download link.
There are quite a few indirections involved, when downloading
source code from GitHub. With this method we build what we
think is a good download link for long term storage.
"""
url = repo['archive_url'].replace('{/ref}', '{ref}', 1)
url = url.format(archive_format='zipball', ref='/'+ref)

res = self.github_request(url, self.github_name, self.github_token,
return_response=True, allow_redirects=False)
if res.status_code != 302: # if this is ever true, we must not assume
res.raise_for_status() # our approach is still sensible
return url.replace(
"https://api.github.com/repos", "https://github.com"
).replace(
"zipball/refs/tags", "archive/refs/tags"
) + '.zip'

def version_to_github_tag(self, version, github_ref, version_prefix=None):
"""Heuristics to find a tag in GitHub that corresponds to
@version in the project identified by @github_ref.

First we must normalize @github_ref, because we are unsure
what is actually passed as this paramter.

The first check for each retrieved tag is the original
get_matching_tag() and all the guessing happens afterwards.
This has the effect that if our guessing does not yield any
viable results, this algo implicitly falls back to checking
every tag with get_matching_tag(), which is the behaviour of
the previous implementation.

If get_matching_tag() did not yield a positive result, we
start guessing tags:
We only care about such tags that produce a non empty match
with self.version_regex, because only these would ever yield
accepted compare() results in get_matching_tag().
Every such tag can be read as a fixed prefix, followed by a
substring as matched by self.version_regex, followed by a
fixed suffix. Usually, the prefix will be "v" and the suffix
will be empty, but sometimes tags are more elaborate.
We expect only the regex-matchable part of a tag changes
from version to version, while the prefix and the suffix are
static.
Given a tag with a static prefix, a static suffix and a
self.version_regex-matchable substring, we can generate
tag names from semantic versions, by reversing the logic
in to_semver_string().
Comparing the original matchable substring, to the result of
to_semver_string() we should be able to generate similar
matchable substrings from @version.
"""
semcmp = self.to_semver_string(version).split('.')
repo = self._get_github_repo(github_ref)
print(f"version_to_github_tag: {github_ref} -> {repo['tags_url']}")

url = repo['tags_url'] +'?per_page=100'
res = self.github_request(url, self.github_name,
self.github_token, return_response=True)
pages = self._get_link_page(res)
prefix, suffix = None, None # tag parts not relevant to versioning
for _ in range(pages): # we prefer this over "while True"
# note: in res.json() we already have the first results page
tags = {tag['name']:tag for tag in res.json()}
for tag in tags:
matching_tag = self.get_matching_tag([tags[tag]], version, url)
if len(matching_tag) > 0: # we found what we believe is
return matching_tag # the correct tag
# if we have seen this _prefix and _suffix,
# we already tried to guess the tag and failed
# no need to try again
best_guess = self.version_regex.search(tag)
best_guess = tag if best_guess is None else best_guess.group(0)
_prefix, _suffix = tag.split(best_guess, 1)
if prefix == _prefix and suffix == _suffix:
continue
prefix, suffix = _prefix, _suffix

# reverse engineer a best_guess from @version
engineered_guess = []
remainder = best_guess
semtag = self.to_semver_string(tag).split('.')
# IIRC to_semver_string() can return versions with more
# than 3 components
for tag_ver, param_ver in zip(semtag, semcmp):
try:
chunk, remainder = remainder.split(tag_ver, 1)
leading_zeroes = self._trailing_zeroes(chunk)
delta_digits = len(param_ver) - len(tag_ver)
if leading_zeroes > 0 \
and leading_zeroes - delta_digits > 0:
chunk = chunk[:-leading_zeroes]\
+ '0' * (leading_zeroes - delta_digits)
engineered_guess.append(chunk)
engineered_guess.append(param_ver)
except ValueError as err:
# sometimes there are wonky tags that are not
# even meant to be a release
pass

guesses = { # set to void generating duplicates
prefix + ''.join(engineered_guess) + suffix,
prefix + ''.join(engineered_guess),
'v' + ''.join(engineered_guess),
''.join(engineered_guess),
prefix + '.'.join(semcmp) + suffix,
prefix + '.'.join(semcmp),
'v' + '.'.join(semcmp),
'.'.join(semcmp),
}
for guess in guesses:
print(f'version_to_github_tag: {tag} {guess} ', end='')
if guess in tags:
print('found on current page')
return self._render_github_source_url(repo, guess)
url = repo['git_refs_url'].replace('{/sha}', '{sha}', 1)
url = url.format(sha='/tags/' + guess)
res = self.github_request(url, self.github_name,
self.github_token, return_response=True)
if res.status_code == 200:
print('is a valid tag')
return self._render_github_source_url(repo, guess)
print(':-(')
try:
url = res.links['next']['url']
res = self.github_request(url, self.github_name,
self.github_token, return_response=True)
print('version_to_github_tag: next page!')
except KeyError as err: # no more result pages
break
print_yellow(" No matching tag for version " + version + " found")
return ""




















def to_semver_string(self, version: str) -> str:
"""Bring all version information to a format we can compare."""
result = self.version_regex.search(version)
Expand Down Expand Up @@ -194,7 +405,10 @@ def find_github_url(self, component: Component, use_language: bool = True) -> st
if len(name_match):
for match in name_match:
tag_info = self.github_request(match["tags_url"], self.github_name, self.github_token)
print(f'find_github_url version_to_github_tag {component.version} {match["tags_url"]}')
new_style = self.version_to_github_tag(component.version, match["tags_url"])
source_url = self.get_matching_tag(tag_info, component.version or "", match["html_url"])
print(f'UPGRADE find_github_url{new_style == source_url} old({source_url}) new({new_style})')
if len(name_match) == 1:
return source_url
elif source_url:
Expand Down Expand Up @@ -261,10 +475,13 @@ def find_golang_url(self, component: Component) -> str:

if repository_name.startswith("https://github.com/"):
repository_name = repository_name[len("https://github.com/"):]
print(f'find_golang_url version_to_github_tag {component_version} {repository_name}')
new_style = self.version_to_github_tag(component_version, repository_name)
tag_info = self.get_github_info(repository_name, self.github_name, self.github_token)
tag_info_checked = self.check_for_github_error(tag_info)
source_url = self.get_matching_tag(tag_info_checked, component_version,
repository_name, version_prefix or "")
print(f'UPGRADE {new_style == source_url} old({source_url}) new({new_style})')

# component["RepositoryUrl"] = repository_name
return source_url
Expand All @@ -284,10 +501,13 @@ def get_github_source_url(self, github_url: str, version: str) -> str:

if self.verbose:
print_text(" repo_name:", repo_name)

print(f'get_github_source_url version_to_github_tag {version} {repo_name}')
new_style = self.version_to_github_tag(version, repo_name)
tag_info = self.get_github_info(repo_name, self.github_name, self.github_token)
tag_info_checked = self.check_for_github_error(tag_info)
return self.get_matching_tag(tag_info_checked, version, github_url)
source_url = self.get_matching_tag(tag_info_checked, version, github_url)
print(f'UPGRADE {new_style == source_url} old({source_url}) new({new_style})')
return source_url

def check_for_github_error(self, tag_info: get_github_info_type) -> List[Dict[str, Any]]:
if isinstance(tag_info, list):
Expand Down
18 changes: 17 additions & 1 deletion tests/test_find_sources.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,12 @@
# SPDX-License-Identifier: MIT
# -------------------------------------------------------------------------------

import json
import os
from typing import Any, Dict, List
from unittest.mock import MagicMock, patch

import requests
import responses

import capycli.common.json_support
Expand Down Expand Up @@ -87,6 +89,7 @@ def test_file_invalid(self) -> None:

def mock_github_request_side_effect(self, url: str, username: str = "", token: str = "") -> Any:
# Define different mock responses based on the URL
print(f'\nmock_github_request({url})')
if url == 'https://api.github.com/repos/tartley/colorama/tags?per_page=100&page=1':
return [
{
Expand Down Expand Up @@ -328,13 +331,26 @@ def test_get_pkg_go_repo_url_error(self, mock_requests_get: Any) -> None:
@patch('capycli.bom.findsources.FindSources.get_matching_tag')
def test_find_golang_url_github(self, mock_get_github_info: Any, mock_get_matching_tag: Any) -> None:
# Mocking a GitHub scenario
tags_first_page = requests.Response()
tags_first_page._content = json.dumps(
[{'name': f'unittest_v1.0.1_unittest'}]
).encode()
mock_get_github_info.return_value = 'https://pkg.go.dev/github.com/opencontainers/runc'
mock_get_matching_tag.return_value = 'https://github.com/opencontainers/runc/archive/refs/tags/v1.0.1.zip'
mock_github_request.side_effect = (
{'tags_url': 'https://api.github.com/repos/opencontainers/runc/tags'},
tags_first_page,
)
find_sources = FindSources()
component = MagicMock()
component.name = 'github.com/opencontainers/runc'
component.version = 'v1.0.1'
source_url = find_sources.find_golang_url(component)
with patch.object(find_sources, 'github_request') as call:
call.side_effect = (
{'tags_url': 'https://api.github.com/repos/opencontainers/runc/tags'},
tags_first_page,
)
source_url = find_sources.find_golang_url(component)

self.assertEqual(source_url, 'https://pkg.go.dev/github.com/opencontainers/runc')

Expand Down
Loading