Skip to content

Commit

Permalink
feat(refactor): improve performance, add on_interval hook, rework CLI…
Browse files Browse the repository at this point in the history
… opts (#473)
  • Loading branch information
ocervell authored Nov 7, 2024
1 parent a170cd0 commit 4a22a70
Show file tree
Hide file tree
Showing 34 changed files with 1,126 additions and 679 deletions.
2 changes: 2 additions & 0 deletions .github/workflows/tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,8 @@ jobs:
- name: Run integration tests
run: |
secator test integration --test test_celery,test_worker,test_tasks
env:
SECATOR_DEBUG_COMPONENT: celery,runner

- name: Archive code coverage results
uses: actions/upload-artifact@v4
Expand Down
84 changes: 43 additions & 41 deletions pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,14 +1,14 @@
[build-system]
requires = ["hatchling"]
build-backend = "hatchling.build"
requires = ['hatchling']
build-backend = 'hatchling.build'

[project]
name = "secator"
version = "0.6.0"
authors = [{ name = "FreeLabz", email = "[email protected]" }]
readme = "README.md"
name = 'secator'
version = '0.6.0'
authors = [{ name = 'FreeLabz', email = '[email protected]' }]
readme = 'README.md'
description = "The pentester's swiss knife."
requires-python = ">=3.8"
requires-python = '>=3.8'
keywords = [
'cybersecurity',
'recon',
Expand All @@ -17,40 +17,42 @@ keywords = [
'automation'
]
classifiers = [
"Development Status :: 3 - Alpha",
"Intended Audience :: Developers",
"Intended Audience :: Information Technology",
"License :: Free for non-commercial use",
"Operating System :: Unix",
"Programming Language :: Python",
"Programming Language :: Python :: 3",
"Programming Language :: Python :: 3.8",
"Programming Language :: Python :: 3.9",
"Programming Language :: Python :: 3.10",
"Programming Language :: Python :: 3.11",
'Development Status :: 3 - Alpha',
'Intended Audience :: Developers',
'Intended Audience :: Information Technology',
'License :: Free for non-commercial use',
'Operating System :: Unix',
'Programming Language :: Python',
'Programming Language :: Python :: 3',
'Programming Language :: Python :: 3.8',
'Programming Language :: Python :: 3.9',
'Programming Language :: Python :: 3.10',
'Programming Language :: Python :: 3.11',
]
dependencies = [
"beautifulsoup4 <= 5",
'beautifulsoup4 <= 5',
'celery < 6',
"cpe < 2",
"dotmap < 2",
"free-proxy < 2",
"furl < 3",
"humanize < 5",
"ifaddr < 1",
"jinja2 < 4",
"packaging < 25",
"python-dotenv < 2",
"pyyaml < 7",
"pydantic < 3",
"requests < 3",
"rich < 14",
"rich-click < 1.7",
"psutil < 7",
"tldextract < 6",
"typing_extensions < 5",
"validators < 1",
"xmltodict < 1"
'cpe < 2',
'dotmap < 2',
'free-proxy < 2',
'furl < 3',
'greenlet < 4',
'humanize < 5',
'ifaddr < 1',
'jinja2 < 4',
'packaging < 25',
'python-dotenv < 2',
'pyyaml < 7',
'pydantic < 3',
'requests < 3',
'rich < 14',
'rich-click < 1.7',
'psutil < 7',
'retry < 1',
'tldextract < 6',
'typing_extensions < 5',
'validators < 1',
'xmltodict < 1'
]

[project.optional-dependencies]
Expand Down Expand Up @@ -85,8 +87,8 @@ google = [
]

[project.scripts]
secator = "secator.cli:cli"
secator = 'secator.cli:cli'

[project.urls]
Homepage = "https://github.com/freelabz/secator"
Issues = "https://github.com/freelabz/secator/issues"
Homepage = 'https://github.com/freelabz/secator'
Issues = 'https://github.com/freelabz/secator/issues'
33 changes: 17 additions & 16 deletions secator/celery.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,8 @@
from celery import Celery, chain, chord, signals
from celery.app import trace

# from pyinstrument import Profiler # TODO: make pyinstrument optional
from rich.logging import RichHandler
from retry import retry

from secator.config import CONFIG
from secator.output_types import Info, Warning, Error
Expand Down Expand Up @@ -111,6 +111,7 @@ def void(*args, **kwargs):
pass


@retry(Exception, tries=3, delay=2)
def update_state(celery_task, task, force=False):
"""Update task state to add metadata information."""
if task.sync:
Expand All @@ -123,7 +124,8 @@ def update_state(celery_task, task, force=False):
sub='celery.state',
id=celery_task.request.id,
obj={task.unique_name: task.status, 'count': task.self_findings_count},
obj_after=False
obj_after=False,
verbose=True
)
return celery_task.update_state(
state='RUNNING',
Expand Down Expand Up @@ -157,7 +159,8 @@ def break_task(task, task_opts, targets, results=[], chunk_size=1):
'',
obj={task.unique_name: 'CHUNKED', 'chunk_size': chunk_size, 'chunks': len(chunks), 'target_count': len(targets)},
obj_after=False,
sub='celery.state'
sub='celery.state',
verbose=True
)

# Clone opts
Expand All @@ -173,11 +176,13 @@ def break_task(task, task_opts, targets, results=[], chunk_size=1):
opts['chunk'] = ix + 1
opts['chunk_count'] = len(chunks)
task_id = str(uuid.uuid4())
opts['has_parent'] = True
opts['enable_duplicate_check'] = False
sig = type(task).s(chunk, **opts).set(queue=type(task).profile, task_id=task_id)
full_name = f'{task.name}_{ix + 1}'
task.add_subtask(task_id, task.name, f'{task.name}_{ix + 1}')
info = Info(message=f'Celery chunked task created: {task_id}', _source=full_name, _uuid=str(uuid.uuid4()))
task.results.append(info)
task.add_result(info)
sigs.append(sig)

# Build Celery workflow
Expand All @@ -198,27 +203,23 @@ def break_task(task, task_opts, targets, results=[], chunk_size=1):

@app.task(bind=True)
def run_task(self, args=[], kwargs={}):
debug(f'Received task with args {args} and kwargs {kwargs}', sub="celery", level=2)
if 'context' not in kwargs:
kwargs['context'] = {}
debug(f'Received task with args {args} and kwargs {kwargs}', sub="celery.run", verbose=True)
kwargs['context']['celery_id'] = self.request.id
task = Task(*args, **kwargs)
task.run()


@app.task(bind=True)
def run_workflow(self, args=[], kwargs={}):
debug(f'Received workflow with args {args} and kwargs {kwargs}', sub="celery", level=2)
if 'context' not in kwargs:
kwargs['context'] = {}
debug(f'Received workflow with args {args} and kwargs {kwargs}', sub="celery.run", verbose=True)
kwargs['context']['celery_id'] = self.request.id
workflow = Workflow(*args, **kwargs)
workflow.run()


@app.task(bind=True)
def run_scan(self, args=[], kwargs={}):
debug(f'Received scan with args {args} and kwargs {kwargs}', sub="celery", level=2)
debug(f'Received scan with args {args} and kwargs {kwargs}', sub="celery.run", verbose=True)
if 'context' not in kwargs:
kwargs['context'] = {}
kwargs['context']['celery_id'] = self.request.id
Expand Down Expand Up @@ -282,7 +283,8 @@ def run_command(self, results, name, targets, opts={}):
},
obj_after=False,
id=self.request.id,
sub='celery.state'
sub='celery.state',
verbose=True
)

# Chunk task if needed
Expand All @@ -306,14 +308,13 @@ def run_command(self, results, name, targets, opts={}):
error = Error.from_exception(e)
error._source = task.unique_name
error._uuid = str(uuid.uuid4())
task._print_item(error)
task.stop_live_tasks()
task.results.append(error)
task.add_result(error, print=True)
task.stop_celery_tasks()

finally:
update_state(self, task, force=True)
gc.collect()
debug('', obj={task.unique_name: task.status, 'results': task.results}, sub='debug.celery.results')
debug('', obj={task.unique_name: task.status, 'results': task.results}, sub='celery.results', verbose=True)
return task.results


Expand Down
45 changes: 25 additions & 20 deletions secator/celery_utils.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,20 @@
from contextlib import nullcontext
from time import sleep

import kombu
import kombu.exceptions

from celery.result import AsyncResult, GroupResult
from greenlet import GreenletExit
from rich.panel import Panel
from rich.padding import Padding

from rich.progress import Progress as RichProgress, SpinnerColumn, TextColumn, TimeElapsedColumn
from contextlib import nullcontext
from secator.config import CONFIG
from secator.definitions import STATE_COLORS
from secator.utils import debug, traceback_as_string
from secator.output_types import Error
from secator.rich import console
from secator.config import CONFIG
import kombu
import kombu.exceptions
from time import sleep
from secator.utils import debug, traceback_as_string


class CeleryData(object):
Expand Down Expand Up @@ -45,8 +50,6 @@ def get_renderables(self):
border_style='bold gold3',
expand=False,
highlight=True), pad=(2, 0, 0, 0))
from rich.console import Console
console = Console()
progress = PanelProgress(
SpinnerColumn('dots'),
TextColumn('{task.fields[descr]} ') if description else '',
Expand Down Expand Up @@ -116,11 +119,15 @@ def poll(result, ids_map, refresh_interval):
try:
yield from CeleryData.get_all_data(result, ids_map)
if result.ready():
debug('RESULT READY', sub='celery.runner', id=result.id)
debug('result is ready', sub='celery.poll', id=result.id)
yield from CeleryData.get_all_data(result, ids_map)
break
except kombu.exceptions.DecodeError:
debug('kombu decode error', sub='debug.celery', id=result.id)
except (KeyboardInterrupt, GreenletExit):
debug('encounted KeyboardInterrupt or GreenletExit', sub='celery.poll')
raise
except Exception as e:
error = Error.from_exception(e)
debug(repr(error), sub='celery.poll')
pass
finally:
sleep(refresh_interval)
Expand All @@ -133,7 +140,6 @@ def get_all_data(result, ids_map):
dict: Subtasks state and results.
"""
task_ids = list(ids_map.keys())
datas = []
for task_id in task_ids:
data = CeleryData.get_task_data(task_id, ids_map)
if not data:
Expand All @@ -143,10 +149,9 @@ def get_all_data(result, ids_map):
sub='celery.poll',
id=data['id'],
obj={data['full_name']: data['state'], 'count': data['count']},
level=4
verbose=True
)
yield data
datas.append(data)

# Calculate and yield parent task progress
# if not datas:
Expand Down Expand Up @@ -183,7 +188,7 @@ def get_task_data(task_id, ids_map):
# Get remote result
res = AsyncResult(task_id)
if not res:
debug('empty response', sub='debug.celery', id=task_id)
debug('empty response', sub='celery.data', id=task_id)
return

# Set up task state
Expand All @@ -202,14 +207,14 @@ def get_task_data(task_id, ids_map):
# - If it's a dict, it's the custom user metadata.

if isinstance(info, Exception):
debug('unhandled exception', obj={'msg': str(info), 'tb': traceback_as_string(info)}, sub='debug.celery', id=task_id)
debug('unhandled exception', obj={'msg': str(info), 'tb': traceback_as_string(info)}, sub='celery.data', id=task_id)
raise info

elif isinstance(info, list):
data['results'] = info
errors = [e for e in info if e._type == 'error']
status = 'FAILURE' if errors else 'SUCCESS'
data['count'] = len([c for c in info if c._source == data['name']])
data['count'] = len([c for c in info if c._source.startswith(data['name'])])
data['state'] = status

elif isinstance(info, dict):
Expand All @@ -226,7 +231,7 @@ def get_task_data(task_id, ids_map):
if progresses:
data['progress'] = progresses[-1].percent

debug('data', obj=data, sub='debug.celery.data', id=task_id)
debug('data', obj=data, sub='celery.data', id=task_id, verbose=True)
return data

@staticmethod
Expand Down Expand Up @@ -258,6 +263,6 @@ def get_task_ids(result, ids=[]):
if hasattr(result, 'parent') and result.parent:
CeleryData.get_task_ids(result.parent, ids=ids)

except kombu.exceptions.DecodeError as e:
console.print(f'[bold red]{str(e)}. Aborting get_task_ids.[/]')
except kombu.exceptions.DecodeError:
debug('kombu decode error', sub='celery.data.get_task_ids')
return
Loading

0 comments on commit 4a22a70

Please sign in to comment.