Skip to content

Commit

Permalink
Merge pull request #380 from DataDog/s.obregoso/feat_honor_requiremen…
Browse files Browse the repository at this point in the history
…ts_versions

Feature: honor requirements versions
  • Loading branch information
sobregosodd authored Jun 14, 2024
2 parents 6b93029 + e210386 commit 1ee3cad
Show file tree
Hide file tree
Showing 8 changed files with 245 additions and 139 deletions.
3 changes: 2 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -12,4 +12,5 @@ wheels/
*.egg-info/
build/
.coverage*

.cache
.semgrep
3 changes: 1 addition & 2 deletions guarddog/analyzer/metadata/npm/typosquatting.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,9 @@
from typing import Optional

from guarddog.analyzer.metadata.typosquatting import TyposquatDetector
from guarddog.utils.config import TOP_PACKAGES_CACHE_LOCATION
import requests

TOP_PACKAGES_CACHE_LOCATION = os.environ.get("GUARDDOG_TOP_PACKAGES_CACHE_LOCATION")


class NPMTyposquatDetector(TyposquatDetector):
"""Detector for typosquatting attacks. Detects if a package name is a typosquat of one of the top 5000 packages.
Expand Down
5 changes: 1 addition & 4 deletions guarddog/analyzer/metadata/pypi/typosquatting.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,11 @@
import packaging.utils

from guarddog.analyzer.metadata.typosquatting import TyposquatDetector

from guarddog.utils.config import TOP_PACKAGES_CACHE_LOCATION

log = logging.getLogger("guarddog")


TOP_PACKAGES_CACHE_LOCATION = os.environ.get('GUARDDOG_TOP_PACKAGES_CACHE_LOCATION')


class PypiTyposquatDetector(TyposquatDetector):
"""
Detector for typosquatting attacks. Detects if a package name is a typosquat of one of the top 1000 packages.
Expand Down
97 changes: 68 additions & 29 deletions guarddog/scanners/npm_project_scanner.py
Original file line number Diff line number Diff line change
@@ -1,38 +1,15 @@
import json
import logging

import requests
from semantic_version import NpmSpec, Version # type:ignore

from guarddog.utils.config import VERIFY_EXHAUSTIVE_DEPENDENCIES
from guarddog.scanners.npm_package_scanner import NPMPackageScanner
from guarddog.scanners.scanner import ProjectScanner

log = logging.getLogger("guarddog")


def find_all_versions(package_name: str, semver_range: str) -> set[str]:
url = f"https://registry.npmjs.org/{package_name}"
log.debug(f"Retrieving npm package metadata from {url}")
response = requests.get(url)
if response.status_code != 200:
log.debug(f"No version available, status code {response.status_code}")
return set()

data = response.json()
versions = list(data["versions"].keys())
log.debug(f"Retrieved versions {', '.join(versions)}")
result = set()
try:
npm_spec = NpmSpec(semver_range)
except ValueError: # not a semver range, let's keep it raw
result.add(semver_range)
return result
for v in versions:
if Version(v) in npm_spec:
result.add(v)
return result


class NPMRequirementsScanner(ProjectScanner):
"""
Scans all packages in the package.json file of a project
Expand All @@ -45,12 +22,69 @@ def __init__(self) -> None:
super().__init__(NPMPackageScanner())

def parse_requirements(self, raw_requirements: str) -> dict:
"""
Parses requirements.txt specification and finds all valid
versions of each dependency
Args:
raw_requirements (str): contents of package file
Returns:
dict: mapping of dependencies to valid versions
ex.
{
....
<dependency-name>: [0.0.1, 0.0.2, ...],
...
}
"""
package = json.loads(raw_requirements)
dependencies = package["dependencies"] if "dependencies" in package else {}
dev_dependencies = package["devDependencies"] if "devDependencies" in package else {}
dev_dependencies = (
package["devDependencies"] if "devDependencies" in package else {}
)

def get_matched_versions(versions: set[str], semver_range: str) -> set[str]:
"""
Retrieves all versions that match a given semver selector
"""
result = []

# Filters to specified versions
try:
spec = NpmSpec(semver_range)
result = [Version(m) for m in versions if spec.match(Version(m))]
except ValueError:
# use it raw
return set([semver_range])

# If just the best matched version scan is required we only keep one
if not VERIFY_EXHAUSTIVE_DEPENDENCIES and result:
result = [sorted(result).pop()]

return set([str(r) for r in result])

def find_all_versions(package_name: str) -> set[str]:
"""
This helper function retrieves all versions availables for the package
"""
url = f"https://registry.npmjs.org/{package_name}"
log.debug(f"Retrieving npm package metadata from {url}")
response = requests.get(url)
if response.status_code != 200:
log.debug(f"No version available, status code {response.status_code}")
return set()

data = response.json()
versions = set(data["versions"].keys())
log.debug(f"Retrieved versions {', '.join(versions)}")
return versions

merged = {} # type: dict[str, set[str]]
for package, selector in list(dependencies.items()) + list(dev_dependencies.items()):
for package, selector in list(dependencies.items()) + list(
dev_dependencies.items()
):
if package not in merged:
merged[package] = set()
merged[package].add(selector)
Expand All @@ -59,7 +93,12 @@ def parse_requirements(self, raw_requirements: str) -> dict:
for package, all_selectors in merged.items():
versions = set() # type: set[str]
for selector in all_selectors:
versions = versions.union(find_all_versions(package, selector))
if len(versions) > 0:
results[package] = versions
versions = versions.union(
get_matched_versions(find_all_versions(package), selector)
)
if len(versions) == 0:
log.error(f"Package/Version {package} not on NPM\n")
continue

results[package] = versions
return results
122 changes: 63 additions & 59 deletions guarddog/scanners/pypi_project_scanner.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,13 @@
import logging
import re
import sys

import pkg_resources
import requests
from packaging.specifiers import Specifier, Version

from guarddog.scanners.pypi_package_scanner import PypiPackageScanner
from guarddog.scanners.scanner import ProjectScanner
from guarddog.utils.config import VERIFY_EXHAUSTIVE_DEPENDENCIES

log = logging.getLogger("guarddog")

Expand Down Expand Up @@ -47,14 +48,13 @@ def _sanitize_requirements(self, requirements: list[str]) -> list[str]:

return sanitized_lines

# FIXME: type return value properly to dict[str, set[str]]
def parse_requirements(self, raw_requirements: str) -> dict:
def parse_requirements(self, raw_requirements: str) -> dict[str, set[str]]:
"""
Parses requirements.txt specification and finds all valid
versions of each dependency
Args:
raw_requirements (List[str]): contents of requirements.txt file
raw_requirements (str): contents of requirements.txt file
Returns:
dict: mapping of dependencies to valid versions
Expand All @@ -67,80 +67,84 @@ def parse_requirements(self, raw_requirements: str) -> dict:
}
"""
requirements = raw_requirements.splitlines()
sanitized_requirements = self._sanitize_requirements(requirements)
dependencies = {}

def versions(package_name):
def get_matched_versions(versions: set[str], semver_range: str) -> set[str]:
"""
Retrieves all versions that match a given semver selector
"""
result = []

# Filters to specified versions
try:
spec = Specifier(semver_range)
result = [Version(m) for m in spec.filter(versions)]
except ValueError:
# use it raw
return set([semver_range])

# If just the best matched version scan is required we only keep one
if not VERIFY_EXHAUSTIVE_DEPENDENCIES and result:
result = [sorted(result).pop()]

return set([str(r) for r in result])

def find_all_versions(package_name: str) -> set[str]:
"""
This helper function retrieves all versions availables for the package
"""
url = "https://pypi.org/pypi/%s/json" % (package_name,)
log.debug(f"Retrieving PyPI package metadata information from {url}")
data = requests.get(url).json()
versions = sorted(data["releases"].keys(), reverse=True)
response = requests.get(url)
if response.status_code != 200:
log.debug(f"No version available, status code {response.status_code}")
return set()

data = response.json()
versions = set(sorted(data["releases"].keys()))
log.debug(f"Retrieved versions {', '.join(versions)}")
return versions

sanitized_requirements = self._sanitize_requirements(requirements)

dependencies = {}

def safe_parse_requirements(req):
"""
This helper function yields one valid requirement line at a time
"""
parsed = pkg_resources.parse_requirements(req)
while True:
try:
yield next(parsed)
except StopIteration:
break
except Exception as e:
sys.stderr.write(f"Error when parsing requirements, received error {str(e)}. This entry will be "
"ignored.\n")
sys.stderr.write(
f"Error when parsing requirements, received error {str(e)}. This entry will be "
"ignored.\n"
)
yield None

try:
for requirement in safe_parse_requirements(sanitized_requirements):
if requirement is None:
continue
valid_versions = None
project_exists_on_pypi = True
for spec in requirement.specs:
qualifier, version = spec

try:
available_versions = versions(requirement.project_name) # type: list[str]
except Exception:
sys.stderr.write(f"Package {requirement.project_name} not on PyPI\n")
project_exists_on_pypi = False
continue

used_versions = None

match qualifier:
case ">":
used_versions = {v for v in available_versions if v > version}
case "<":
used_versions = {v for v in available_versions if v < version}
case ">=":
used_versions = {v for v in available_versions if v >= version}
case "<=":
used_versions = {v for v in available_versions if v <= version}
case "==":
matches = [re.search(version, candidate) for candidate in available_versions]
filtered_matches = list(filter(None, matches))
str_matches = [v.string for v in filtered_matches]
used_versions = set(str_matches)
case "~=":
prefix = "".join(version.split(".")[:-1])
for available_version in available_versions: # sorted decreasing
if available_version >= version and available_version.startswith(prefix):
used_versions = set(available_version)
break
case _:
sys.stderr.write(f"Unknown qualifier: {qualifier}")
continue

if valid_versions is None:
valid_versions = used_versions
else:
valid_versions = valid_versions & used_versions

if project_exists_on_pypi:
dependencies[requirement.project_name] = valid_versions

versions = get_matched_versions(
find_all_versions(requirement.project_name),
(
requirement.url
if requirement.url
else str(requirement.specifier)
),
)

if len(versions) == 0:
log.error(
f"Package/Version {requirement.project_name} not on PyPI\n"
)
continue

dependencies[requirement.project_name] = versions
except Exception as e:
sys.stderr.write(f"Received error {str(e)}")
log.error(f"Received error {str(e)}")

return dependencies
Loading

0 comments on commit 1ee3cad

Please sign in to comment.