Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update validate output #370

Open
wants to merge 5 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
59 changes: 33 additions & 26 deletions contentctl/actions/validate.py
Original file line number Diff line number Diff line change
@@ -1,41 +1,48 @@
import pathlib

from contentctl.input.director import Director, DirectorOutputDto
from contentctl.objects.config import validate
from contentctl.enrichments.attack_enrichment import AttackEnrichment
from contentctl.enrichments.cve_enrichment import CveEnrichment
from contentctl.objects.atomic import AtomicEnrichment
from contentctl.objects.lookup import FileBackedLookup
from contentctl.helper.splunk_app import SplunkApp
from contentctl.helper.utils import Utils
from contentctl.input.director import Director, DirectorOutputDto, ValidationFailedError
from contentctl.objects.atomic import AtomicEnrichment
from contentctl.objects.config import validate
from contentctl.objects.data_source import DataSource
from contentctl.helper.splunk_app import SplunkApp
from contentctl.objects.lookup import FileBackedLookup


class Validate:
def execute(self, input_dto: validate) -> DirectorOutputDto:
director_output_dto = DirectorOutputDto(
AtomicEnrichment.getAtomicEnrichment(input_dto),
AttackEnrichment.getAttackEnrichment(input_dto),
CveEnrichment.getCveEnrichment(input_dto),
[],
[],
[],
[],
[],
[],
[],
[],
[],
[],
)
try:
director_output_dto = DirectorOutputDto(
AtomicEnrichment.getAtomicEnrichment(input_dto),
AttackEnrichment.getAttackEnrichment(input_dto),
CveEnrichment.getCveEnrichment(input_dto),
[],
[],
[],
[],
[],
[],
[],
[],
[],
[],
)

director = Director(director_output_dto)
director.execute(input_dto)
self.ensure_no_orphaned_files_in_lookups(
input_dto.path, director_output_dto
)
if input_dto.data_source_TA_validation:
self.validate_latest_TA_information(director_output_dto.data_sources)

director = Director(director_output_dto)
director.execute(input_dto)
self.ensure_no_orphaned_files_in_lookups(input_dto.path, director_output_dto)
if input_dto.data_source_TA_validation:
self.validate_latest_TA_information(director_output_dto.data_sources)
return director_output_dto

return director_output_dto
except ValidationFailedError:
# Just re-raise without additional output since we already formatted everything
raise SystemExit(1)

def ensure_no_orphaned_files_in_lookups(
self, repo_path: pathlib.Path, director_output_dto: DirectorOutputDto
Expand Down
162 changes: 134 additions & 28 deletions contentctl/input/director.py
Original file line number Diff line number Diff line change
@@ -1,30 +1,29 @@
import os
import sys
from pathlib import Path
from dataclasses import dataclass, field
from pydantic import ValidationError
from pathlib import Path
from uuid import UUID
from contentctl.input.yml_reader import YmlReader

from contentctl.objects.detection import Detection
from contentctl.objects.story import Story
from pydantic import ValidationError

from contentctl.objects.baseline import Baseline
from contentctl.objects.investigation import Investigation
from contentctl.objects.playbook import Playbook
from contentctl.objects.deployment import Deployment
from contentctl.objects.macro import Macro
from contentctl.objects.lookup import LookupAdapter, Lookup
from contentctl.objects.atomic import AtomicEnrichment
from contentctl.objects.security_content_object import SecurityContentObject
from contentctl.objects.data_source import DataSource
from contentctl.objects.dashboard import Dashboard
from contentctl.enrichments.attack_enrichment import AttackEnrichment
from contentctl.enrichments.cve_enrichment import CveEnrichment

from contentctl.helper.utils import Utils
from contentctl.input.yml_reader import YmlReader
from contentctl.objects.atomic import AtomicEnrichment
from contentctl.objects.baseline import Baseline
from contentctl.objects.config import validate
from contentctl.objects.dashboard import Dashboard
from contentctl.objects.data_source import DataSource
from contentctl.objects.deployment import Deployment
from contentctl.objects.detection import Detection
from contentctl.objects.enums import SecurityContentType
from contentctl.helper.utils import Utils
from contentctl.objects.investigation import Investigation
from contentctl.objects.lookup import Lookup, LookupAdapter
from contentctl.objects.macro import Macro
from contentctl.objects.playbook import Playbook
from contentctl.objects.security_content_object import SecurityContentObject
from contentctl.objects.story import Story


@dataclass
Expand Down Expand Up @@ -93,6 +92,39 @@ def addContentToDictMappings(self, content: SecurityContentObject):
self.uuid_to_content_map[content.id] = content


class Colors:
HEADER = "\033[95m"
BLUE = "\033[94m"
CYAN = "\033[96m"
GREEN = "\033[92m"
YELLOW = "\033[93m"
RED = "\033[91m"
BOLD = "\033[1m"
UNDERLINE = "\033[4m"
END = "\033[0m"
MAGENTA = "\033[35m"
BRIGHT_MAGENTA = "\033[95m"

# Add fallback symbols for Windows
CHECK_MARK = "✓" if sys.platform != "win32" else "*"
WARNING = "⚠️" if sys.platform != "win32" else "!"
ERROR = "❌" if sys.platform != "win32" else "X"
ARROW = "🎯" if sys.platform != "win32" else ">"
TOOLS = "🛠️" if sys.platform != "win32" else "#"
DOCS = "📚" if sys.platform != "win32" else "?"
BULB = "💡" if sys.platform != "win32" else "i"
SEARCH = "🔍" if sys.platform != "win32" else "@"
ZAP = "⚡" if sys.platform != "win32" else "!"


class ValidationFailedError(Exception):
"""Custom exception for validation failures that already have formatted output."""

def __init__(self, message: str):
self.message = message
super().__init__(message)


class Director:
input_dto: validate
output_dto: DirectorOutputDto
Expand Down Expand Up @@ -255,18 +287,92 @@ def createSecurityContent(self, contentType: SecurityContentType) -> None:
end="",
flush=True,
)
print("Done!")

if len(validation_errors) > 0:
errors_string = "\n\n".join(
[
f"File: {e_tuple[0]}\nError: {str(e_tuple[1])}"
for e_tuple in validation_errors
]
print("\n") # Clean separation
print(f"{Colors.BOLD}{Colors.BRIGHT_MAGENTA}╔{'═' * 60}╗{Colors.END}")
print(
f"{Colors.BOLD}{Colors.BRIGHT_MAGENTA}║{Colors.BLUE}{f'{Colors.SEARCH} Content Validation Summary':^60}{Colors.BRIGHT_MAGENTA}║{Colors.END}"
)
# print(f"The following {len(validation_errors)} error(s) were found during validation:\n\n{errors_string}\n\nVALIDATION FAILED")
# We quit after validation a single type/group of content because it can cause significant cascading errors in subsequent
# types of content (since they may import or otherwise use it)
raise Exception(
f"The following {len(validation_errors)} error(s) were found during validation:\n\n{errors_string}\n\nVALIDATION FAILED"
print(f"{Colors.BOLD}{Colors.BRIGHT_MAGENTA}╚{'═' * 60}╝{Colors.END}\n")

print(
f"{Colors.BOLD}{Colors.GREEN}✨ Validation Completed{Colors.END} – Issues detected in {Colors.RED}{Colors.BOLD}{len(validation_errors)}{Colors.END} files.\n"
)

for index, entry in enumerate(validation_errors, 1):
file_path, error = entry
width = max(70, len(str(file_path)) + 15)

# File header with numbered emoji
number_emoji = f"{index}️⃣"
print(f"{Colors.YELLOW}┏{'━' * width}┓{Colors.END}")
print(
f"{Colors.YELLOW}┃{Colors.BOLD} {number_emoji} File: {Colors.CYAN}{file_path}{Colors.END}{' ' * (width - len(str(file_path)) - 12)}{Colors.YELLOW}┃{Colors.END}"
)
print(f"{Colors.YELLOW}┗{'━' * width}┛{Colors.END}")

print(
f" {Colors.RED}{Colors.BOLD}{Colors.ZAP} Validation Issues:{Colors.END}"
)

if isinstance(error, ValidationError):
for err in error.errors():
error_msg = err.get("msg", "")
if "https://errors.pydantic.dev" in error_msg:
continue

# Clean error categorization
if "Field required" in error_msg:
print(
f" {Colors.YELLOW}{Colors.WARNING} Field Required: {err.get('loc', [''])[0]}{Colors.END}"
)
elif "Input should be" in error_msg:
print(
f" {Colors.MAGENTA}{Colors.ARROW} Invalid Value for {err.get('loc', [''])[0]}{Colors.END}"
)
if "permitted values:" in error_msg:
options = error_msg.split("permitted values:")[
-1
].strip()
print(f" Valid options: {options}")
elif "Extra inputs" in error_msg:
print(
f" {Colors.BLUE}❌ Unexpected Field: {err.get('loc', [''])[0]}{Colors.END}"
)
elif "Failed to find" in error_msg:
print(
f" {Colors.RED}🔍 Missing Reference: {error_msg}{Colors.END}"
)
else:
print(f" {Colors.RED}❌ {error_msg}{Colors.END}")
else:
print(f" {Colors.RED}❌ {str(error)}{Colors.END}")
print("")

# Clean footer with next steps
max_width = max(60, max(len(str(e[0])) + 15 for e in validation_errors))
print(f"{Colors.BOLD}{Colors.CYAN}╔{'═' * max_width}╗{Colors.END}")
print(
f"{Colors.BOLD}{Colors.CYAN}║{Colors.BLUE}{'🎯 Next Steps':^{max_width}}{Colors.CYAN}║{Colors.END}"
)
print(f"{Colors.BOLD}{Colors.CYAN}╚{'═' * max_width}╝{Colors.END}\n")

print(
f"{Colors.GREEN}{Colors.TOOLS} Fix the validation issues in the listed files{Colors.END}"
)
print(
f"{Colors.YELLOW}{Colors.DOCS} Check the documentation: {Colors.UNDERLINE}https://github.com/splunk/contentctl{Colors.END}"
)
print(
f"{Colors.BLUE}{Colors.BULB} Use --verbose for detailed error information{Colors.END}\n"
)

raise ValidationFailedError(
f"Validation failed with {len(validation_errors)} error(s)"
)

# Success case
print(
f"\r{f'{contentType.name.upper()} Progress'.rjust(23)}: [{progress_percent:3.0f}%]... {Colors.GREEN}{Colors.CHECK_MARK} Done!{Colors.END}"
)
Loading