Skip to content

Commit

Permalink
feat(config): load external tasks from template dir (#373)
Browse files Browse the repository at this point in the history
Fixes #46
  • Loading branch information
ocervell authored May 2, 2024
1 parent ab396a3 commit 0c63c02
Show file tree
Hide file tree
Showing 9 changed files with 143 additions and 34 deletions.
2 changes: 1 addition & 1 deletion secator/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -1105,5 +1105,5 @@ def integration(tasks, workflows, scans, test, debug):
@test.command()
def coverage():
"""Run coverage report."""
cmd = f'{sys.executable} -m coverage report -m --omit=*/site-packages/*,*/tests/*'
cmd = f'{sys.executable} -m coverage report -m --omit=*/site-packages/*,*/tests/*,*/templates/*'
run_test(cmd, 'coverage')
6 changes: 0 additions & 6 deletions secator/output_types/vulnerability.py
Original file line number Diff line number Diff line change
Expand Up @@ -89,11 +89,5 @@ def __repr__(self):
s = f'[dim]{s}[/]'
return rich_to_ansi(s)

# def __gt__(self, other):
# # favor httpx over other url info tools
# if self._source == 'httpx' and other._source != 'httpx':
# return True
# return super().__gt__(other)

def __str__(self):
return self.matched_at + ' -> ' + self.name
10 changes: 4 additions & 6 deletions secator/tasks/__init__.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,8 @@
from secator.utils import discover_internal_tasks, discover_external_tasks
INTERNAL_TASKS = discover_internal_tasks()
EXTERNAL_TASKS = discover_external_tasks()
ALL_TASKS = INTERNAL_TASKS + EXTERNAL_TASKS
from secator.utils import discover_tasks
TASKS = discover_tasks()
__all__ = [
cls.__name__
for cls in ALL_TASKS
for cls in TASKS
]
for cls in INTERNAL_TASKS:
for cls in TASKS:
exec(f'from .{cls.__name__} import {cls.__name__}')
42 changes: 29 additions & 13 deletions secator/utils.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import inspect
import importlib
import itertools
import logging
import operator
Expand All @@ -8,7 +9,7 @@
import sys
import warnings
from datetime import datetime
from importlib import import_module

from inspect import isclass
from pathlib import Path
from pkgutil import iter_modules
Expand Down Expand Up @@ -138,7 +139,7 @@ def discover_internal_tasks():
if module_name.startswith('_'):
continue
try:
module = import_module(f'secator.tasks.{module_name}')
module = importlib.import_module(f'secator.tasks.{module_name}')
except ImportError as e:
console.print(f'[bold red]Could not import secator.tasks.{module_name}:[/]')
console.print(f'\t[bold red]{type(e).__name__}[/]: {str(e)}')
Expand All @@ -160,17 +161,32 @@ def discover_internal_tasks():

def discover_external_tasks():
"""Find external secator tasks."""
if not os.path.exists('config.secator'):
return []
with open('config.secator', 'r') as f:
classes = f.read().splitlines()
output = []
for cls_path in classes:
cls = import_dynamic(cls_path, cls_root='Command')
if not cls:
continue
# logger.warning(f'Added external tool {cls_path}')
output.append(cls)
sys.dont_write_bytecode = True
for path in CONFIG.dirs.templates.glob('**/*.py'):
try:
task_name = path.stem
module_name = f'secator.tasks.{task_name}'

# console.print(f'Importing module {module_name} from {path}')
spec = importlib.util.spec_from_file_location(module_name, path)
module = importlib.util.module_from_spec(spec)
# console.print(f'Adding module "{module_name}" to sys path')
sys.modules[module_name] = module

# console.print(f'Executing module "{module}"')
spec.loader.exec_module(module)

# console.print(f'Checking that {module} contains task {task_name}')
if not hasattr(module, task_name):
console.print(f'[bold orange1]Could not load external task "{task_name}" from module {path.name}[/] ({path})')
continue
cls = getattr(module, task_name)
console.print(f'[bold green]Successfully loaded external task "{task_name}"[/] ({path})')
output.append(cls)
except Exception as e:
console.print(f'[bold red]Could not load external module {path.name}. Reason: {str(e)}.[/] ({path})')
sys.dont_write_bytecode = False
return output


Expand All @@ -194,7 +210,7 @@ def import_dynamic(cls_path, cls_root='Command'):
"""
try:
package, name = cls_path.rsplit(".", maxsplit=1)
cls = getattr(import_module(package), name)
cls = getattr(importlib.import_module(package), name)
root_cls = inspect.getmro(cls)[-2]
if root_cls.__name__ == cls_root:
return cls
Expand Down
13 changes: 13 additions & 0 deletions secator/utils_test.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import contextlib
import json
import os
import sys
import unittest.mock

from fp.fp import FreeProxy
Expand Down Expand Up @@ -182,3 +183,15 @@ def _test_task_output(
raise

console.print('[bold green] ok[/]')


def clear_modules():
"""Clear all secator modules imports.
See https://stackoverflow.com/questions/7460363/re-import-module-under-test-to-lose-context for context.
"""
keys_to_delete = []
for k, _ in sys.modules.items():
if k.startswith('secator'):
keys_to_delete.append(k)
for k in keys_to_delete:
del sys.modules[k]
36 changes: 36 additions & 0 deletions tests/fixtures/ls.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
from secator.runners import Command
from secator.decorators import task
from secator.output_types import Vulnerability


@task()
class ls(Command):
cmd = 'ls -al'
output_types = [Vulnerability]
output_map = {
Vulnerability: {}
}

@staticmethod
def item_loader(self, line):
fields = ['permissions', 'link_count', 'owner', 'group', 'size', 'month', 'day', 'hour', 'path']
result = [c for c in line.split(' ') if c]
if len(result) != len(fields):
return None
data = {}
for ix, value in enumerate(result):
data[fields[ix]] = value

# Output vulnerabilities
permissions = data['permissions']
path = data['path']
full_path = f'{self.input}/{path}'
if permissions[-2] == 'w': # found a vulnerability !
yield Vulnerability(
name='World-writeable path',
severity='high',
confidence='high',
provider='ls',
matched_at=full_path,
extra_data={k: v for k, v in data.items() if k != 'path'}
)
4 changes: 4 additions & 0 deletions tests/fixtures/ls.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
type: workflow
name: ls
tasks:
ls:
11 changes: 3 additions & 8 deletions tests/unit/test_offline.py
Original file line number Diff line number Diff line change
@@ -1,17 +1,12 @@
import os
import sys
import unittest

from secator.utils_test import clear_modules


class TestOffline(unittest.TestCase):
def setUp(self):
try:
# This allows to drop the secator module loaded from other tests in order to reload the config with modified
# environment variables.
# See https://stackoverflow.com/questions/7460363/re-import-module-under-test-to-lose-context for context.
del sys.modules['secator']
except KeyError:
pass
clear_modules()
os.environ['SECATOR_OFFLINE_MODE'] = '1'

def test_offline_cve_lookup(self):
Expand Down
53 changes: 53 additions & 0 deletions tests/unit/test_template.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
import unittest
from secator.config import CONFIG
from secator.output_types import Vulnerability
from secator.utils_test import FIXTURES_DIR, clear_modules
import os

import shutil


class TestTemplate(unittest.TestCase):
def setUp(self):
self.template_dir = CONFIG.dirs.templates
self.custom_task_path = self.template_dir / 'ls.py'
self.writeable_file = self.template_dir / 'test.txt'
self.custom_workflow_path = self.template_dir / 'ls.yml'
shutil.copy(f'{FIXTURES_DIR}/ls.py', self.custom_task_path)
shutil.copy(f'{FIXTURES_DIR}/ls.yml', self.custom_workflow_path)
self.writeable_file.touch()
os.chmod(self.writeable_file, 0o007)
self.expected_vuln = Vulnerability(
name='World-writeable path',
severity='high',
confidence='high',
provider='ls',
matched_at=f'{str(self.writeable_file)}',
_source='ls',
)
clear_modules()
self.maxDiff = None

def tearDown(self):
self.custom_task_path.unlink()
self.custom_workflow_path.unlink()
self.writeable_file.unlink()

def test_external_task(self):
from secator.tasks import ls
results = ls(str(self.template_dir)).run()
self.assertEqual(len(results), 1)
self.assertTrue(self.expected_vuln == Vulnerability.load(results[0].toDict()))

def test_external_workflow(self):
from secator.cli import ALL_WORKFLOWS
from secator.runners import Workflow
ls_workflow = None
for w in ALL_WORKFLOWS:
if w.name == 'ls':
ls_workflow = w
self.assertIsNotNone(ls_workflow)
results = Workflow(ls_workflow, targets=[str(self.template_dir)]).run()
self.assertEqual(len(results), 2)
self.assertTrue(self.expected_vuln == Vulnerability.load(results[1].toDict()))

0 comments on commit 0c63c02

Please sign in to comment.