Skip to content

Commit

Permalink
Merge branch 'main' into julien.doutre/go-support
Browse files Browse the repository at this point in the history
  • Loading branch information
juliendoutre committed Jul 17, 2024
2 parents 0a3ec93 + b228c66 commit 28de22b
Show file tree
Hide file tree
Showing 18 changed files with 2,252 additions and 410 deletions.
2 changes: 2 additions & 0 deletions .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,8 @@ jobs:
run: make test-metadata-rules
- name: Core unit tests
run: make test-core
- name: Reporters unit tests
run: make test-reporters
- name: Report coverage
run: make coverage-report

Expand Down
7 changes: 5 additions & 2 deletions Makefile
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
.PHONY: test test-semgrep-rules test-metadata-rules test-core docs

test: test-semgrep-rules test-metadata-rules test-core coverage-report
test: test-semgrep-rules test-metadata-rules test-core test-reporters coverage-report

type-check:
mypy --install-types --non-interactive guarddog
Expand All @@ -18,8 +18,11 @@ test-metadata-rules:
test-core:
COVERAGE_FILE=.coverage_core coverage run -m pytest tests/core

test-reporters:
COVERAGE_FILE=.coverage_reporters coverage run -m pytest tests/reporters

coverage-report:
coverage combine .coverage_metadata .coverage_core
coverage combine .coverage_metadata .coverage_core .coverage_reporters
coverage report

docs:
Expand Down
43 changes: 43 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,49 @@ Metadata heuristics:

<!-- END_RULE_LIST -->

## Custom Rules

Guarddog allows to implement custom sourcecode rules.
Sourcecode rules live under the [guarddog/analyzer/sourcecode](guarddog/analyzer/sourcecode) directory, and supported formats are [Semgrep](https://github.com/semgrep/semgrep) or [Yara](https://github.com/VirusTotal/yara).

* Semgrep rules are language-dependent, and Guarddog will import all `.yml` rules where the language matches the ecosystem selected by the user in CLI.
* Yara rules on the other hand are language agnostic, therefore all matching `.yar` rules present will be imported.

Is possible then to write your own rule and drop it into that directory, Guarddog will allow you to select it or exclude it as any built-in rule as well as appending the findings to its output.

For example, you can create the following semgrep rule:
```yaml
rules:
- id: sample-rule
languages:
- python
message: Output message when rule matches
metadata:
description: Description used in the CLI help
patterns:
YOUR RULE HEURISTICS GO HERE
severity: WARNING
```
Then you'll need to save it as `sample-rule.yml` and note that the id must match the filename

In the case of Yara, you can create the following rule:
```
rule sample-rule
{
meta:
description = "Description used in the output message"
target_entity = "file"
strings:
$exec = "exec"
condition:
1 of them
}
```
Then you'll need to save it as `sample-rule.yar`.

Note that in both cases, the rule id must match the filename

## Running GuardDog in a GitHub Action

The easiest way to integrate GuardDog in your CI pipeline is to leverage the SARIF output format, and upload it to GitHub's [code scanning](https://docs.github.com/en/code-security/code-scanning/automatically-scanning-your-code-for-vulnerabilities-and-errors/about-code-scanning) feature.
Expand Down
112 changes: 99 additions & 13 deletions guarddog/analyzer/analyzer.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,13 @@
import logging
import os
import subprocess
import yara # type: ignore
from collections import defaultdict
from pathlib import Path
from typing import Iterable, Optional
from typing import Iterable, Optional, Dict

from guarddog.analyzer.metadata import get_metadata_detectors
from guarddog.analyzer.sourcecode import SOURCECODE_RULES
from guarddog.analyzer.sourcecode import get_sourcecode_rules, SempgrepRule, YaraRule
from guarddog.ecosystems import ECOSYSTEM

SEMGREP_MAX_TARGET_BYTES = 10_000_000
Expand All @@ -24,6 +25,7 @@ class Analyzer:
ecosystem (str): name of the current ecosystem
metadata_ruleset (list): list of metadata rule names
sourcecode_ruleset (list): list of source code rule names
ioc_ruleset (list): list of ioc rule names
exclude (list): list of directories to exclude from source code search
Expand All @@ -32,14 +34,18 @@ class Analyzer:

def __init__(self, ecosystem=ECOSYSTEM.PYPI) -> None:
self.sourcecode_rules_path = os.path.join(os.path.dirname(__file__), "sourcecode")

self.ecosystem = ecosystem

# Rules and associated detectors
self.metadata_detectors = get_metadata_detectors(ecosystem)

self.metadata_ruleset: set[str] = set(self.metadata_detectors.keys())
self.sourcecode_ruleset: set[str] = set(rule["id"] for rule in SOURCECODE_RULES[ecosystem])
self.semgrep_ruleset: set[str] = set(
r.id for r in get_sourcecode_rules(ecosystem, SempgrepRule)
)
self.yara_ruleset: set[str] = set(
r.id for r in get_sourcecode_rules(ecosystem, YaraRule)
)

# Define paths to exclude from sourcecode analysis
self.exclude = [
Expand Down Expand Up @@ -77,10 +83,7 @@ def analyze(self, path, info=None, rules=None, name: Optional[str] = None, versi
sourcecode_results = None

# populate results, errors, and number of issues
log.debug(f"Running metadata rules against package '{name}'")
metadata_results = self.analyze_metadata(path, info, rules, name, version)

log.debug(f"Running source code rules against directory '{path}'")
sourcecode_results = self.analyze_sourcecode(path, rules)

# Concatenate dictionaries together
Expand All @@ -104,6 +107,8 @@ def analyze_metadata(self, path: str, info, rules=None, name: Optional[str] = No
dict[str]: map from each metadata rule and their corresponding output
"""

log.debug(f"Running metadata rules against package '{name}'")

all_rules = self.metadata_ruleset
if rules is not None:
# filtering the full ruleset witht the user's input
Expand Down Expand Up @@ -139,11 +144,87 @@ def analyze_sourcecode(self, path, rules=None) -> dict:
Returns:
dict[str]: map from each source code rule and their corresponding output
"""
semgrepscan_results = self.analyze_semgrep(path, rules)

yarascan_results = self.analyze_yara(path, rules)

# Concatenate dictionaries together
issues = semgrepscan_results["issues"] + yarascan_results["issues"]
results = semgrepscan_results["results"] | yarascan_results["results"]
errors = semgrepscan_results["errors"] | yarascan_results["errors"]

return {"issues": issues, "errors": errors, "results": results, "path": path}

def analyze_yara(self, path: str, rules: Optional[set] = None) -> dict:
"""
Analyzes the IOCs of a given package
Args:
path (str): path to package
rules (set, optional): Set of IOC rules to analyze. Defaults to all rules.
Returns:
dict[str]: map from each IOC rule and their corresponding output
"""
log.debug(f"Running yara rules against directory '{path}'")

all_rules = self.yara_ruleset
if rules is not None:
# filtering the full ruleset witht the user's input
all_rules = self.yara_ruleset & rules

results = {rule: {} for rule in all_rules} # type: dict
errors: Dict[str, str] = {}
issues = 0

rules_path = {
rule_name: os.path.join(self.sourcecode_rules_path, f"{rule_name}.yar")
for rule_name in all_rules
}

if len(rules_path) == 0:
log.debug("No yara rules to run")
return {"results": results, "errors": errors, "issues": issues}

try:
scan_rules = yara.compile(filepaths=rules_path)

for root, _, files in os.walk(path):
for f in files:
matches = scan_rules.match(os.path.join(root, f))
for m in matches:
for s in m.strings:
for i in s.instances:
rule_results = {
"location": f"{f}:{i.offset}",
"code": self.trim_code_snippet(str(i.matched_data)),
'message': m.meta.get("description", f"{m.rule} rule matched")
}
issues += len(m.strings)
results[m.rule].update(rule_results)
except Exception as e:
errors["rules-all"] = f"failed to run rule: {str(e)}"

return {"results": results, "errors": errors, "issues": issues}

def analyze_semgrep(self, path, rules=None) -> dict:
"""
Analyzes the source code of a given package
Args:
path (str): path to directory of package
rules (set, optional): Set of source code rules to analyze. Defaults to all rules.
Returns:
dict[str]: map from each source code rule and their corresponding output
"""
log.debug(f"Running semgrep rules against directory '{path}'")

targetpath = Path(path)
all_rules = self.sourcecode_ruleset
all_rules = self.semgrep_ruleset
if rules is not None:
# filtering the full ruleset witht the user's input
all_rules = self.sourcecode_ruleset & rules
all_rules = self.semgrep_ruleset & rules

results = {rule: {} for rule in all_rules} # type: dict
errors = {}
Expand All @@ -155,11 +236,11 @@ def analyze_sourcecode(self, path, rules=None) -> dict:
))

if len(rules_path) == 0:
log.debug("No source code rules to run")
log.debug("No semgrep code rules to run")
return {"results": {}, "errors": {}, "issues": 0}

try:
log.debug(f"Running source code rules against {path}")
log.debug(f"Running semgrep code rules against {path}")
response = self._invoke_semgrep(target=path, rules=rules_path)
rule_results = self._format_semgrep_response(response, targetpath=targetpath)
issues += sum(len(res) for res in rule_results.values())
Expand Down Expand Up @@ -240,11 +321,16 @@ def _format_semgrep_response(self, response, rule=None, targetpath=None):
location = file_path + ":" + str(line)
code = self.trim_code_snippet(code_snippet)

results[rule_name].append({
finding = {
'location': location,
'code': code,
'message': result["extra"]["message"]
})
}

rule_results = results[rule_name]
if finding in rule_results:
continue
results[rule_name].append(finding)

return results

Expand Down
107 changes: 89 additions & 18 deletions guarddog/analyzer/sourcecode/__init__.py
Original file line number Diff line number Diff line change
@@ -1,37 +1,108 @@
import os
import pathlib
from dataclasses import dataclass
from typing import Optional, Iterable

import yaml
from yaml.loader import SafeLoader

from guarddog.ecosystems import ECOSYSTEM

current_dir = pathlib.Path(__file__).parent.resolve()
rule_file_names = list(
filter(
lambda x: x.endswith('yml'),
os.listdir(current_dir)
)
)

SOURCECODE_RULES = {
ECOSYSTEM.PYPI: list(),
ECOSYSTEM.NPM: list(),
ECOSYSTEM.GO: list(),
} # type: dict[ECOSYSTEM, list[dict]]

for file_name in rule_file_names:
# These data class aim to reduce the spreading of the logic
# Instead of using the a dict as a structure and parse it difffently depending on the type
@dataclass
class SourceCodeRule:
"""
Base class for source code rules
"""
id: str
file: str


@dataclass
class YaraRule(SourceCodeRule):
"""
Yara rule just reimplements base
"""
pass


@dataclass
class SempgrepRule(SourceCodeRule):
"""
Semgrep rule are language specific
Content of rule in yaml format is accessible through rule_content
"""
description: str
ecosystem: ECOSYSTEM
rule_content: dict


def get_sourcecode_rules(
ecosystem: ECOSYSTEM, kind: Optional[type] = None
) -> Iterable[SourceCodeRule]:
"""
This function returns the source code rules for a given ecosystem and kind.
Args:
ecosystem: The ecosystem to filter for if rules are ecosystem specific
kind: The kind of rule to filter for
"""
for rule in SOURCECODE_RULES:
if kind and not isinstance(rule, kind):
continue
if not (getattr(rule, "ecosystem", ecosystem) == ecosystem):
continue
yield rule


SOURCECODE_RULES: list[SourceCodeRule] = list()

semgrep_rule_file_names = list(
filter(lambda x: x.endswith("yml"), os.listdir(current_dir))
)
# all yml files placed in the sourcecode directory are loaded as semgrep rules
# refer to README.md for more information
for file_name in semgrep_rule_file_names:
with open(os.path.join(current_dir, file_name), "r") as fd:
data = yaml.load(fd, Loader=SafeLoader)
for rule in data["rules"]:
for lang in rule["languages"]:
ecosystem = None
match lang:
case "python":
if rule not in SOURCECODE_RULES[ECOSYSTEM.PYPI]:
SOURCECODE_RULES[ECOSYSTEM.PYPI].append(rule)
ecosystem = ECOSYSTEM.PYPI
case "javascript" | "typescript" | "json":
if rule not in SOURCECODE_RULES[ECOSYSTEM.NPM]:
SOURCECODE_RULES[ECOSYSTEM.NPM].append(rule)
ecosystem = ECOSYSTEM.NPM
case "go":
if rule not in SOURCECODE_RULES[ECOSYSTEM.GO]:
SOURCECODE_RULES[ECOSYSTEM.GO].append(rule)
ecosystem = ECOSYSTEM.GO
case _:
continue

# avoids duplicates when multiple languages are supported by a rule
if not next(
filter(
lambda r: r.id == rule["id"],
get_sourcecode_rules(ecosystem, SempgrepRule),
),
None,
):
SOURCECODE_RULES.append(
SempgrepRule(
id=rule["id"],
ecosystem=ecosystem,
description=rule.get("metadata", {}).get("description", ""),
file=file_name,
rule_content=rule,
)
)

yara_rule_file_names = list(
filter(lambda x: x.endswith("yar"), os.listdir(current_dir))
)
# all yar files placed in the sourcecode directory are loaded as YARA rules
# refer to README.md for more information
for file_name in yara_rule_file_names:
SOURCECODE_RULES.append(YaraRule(id=pathlib.Path(file_name).stem, file=file_name))
Loading

0 comments on commit 28de22b

Please sign in to comment.